Files
seaweedfs/weed/server/volume_grpc_erasure_coding_test.go
Chris Lu 77ac781bbd fix(ec): VolumeEcShardsInfo walks every disk on multi-disk servers (#9568)
* fix(ec): VolumeEcShardsInfo walks every disk on multi-disk servers

When a volume server holds EC shards for the same vid across more than
one disk, each DiskLocation registers its own EcVolume entry and
Store.FindEcVolume returns whichever one it hits first. The shard-info
RPC iterated only that single EcVolume's Shards, so the response missed
every shard mounted on a sibling disk.

The worker's verifyEcShardsBeforeDelete sums the per-server responses
into a union bitmap and refuses to delete the source volume when the
union falls short of dataShards+parityShards. On multi-disk
destinations, the union was systematically under-counted and source
deletion got blocked even though all shards were physically present and
mounted.

Walk every DiskLocation in the handler and emit the deduplicated union
of all shards. The .ecx-backed fields (file counts, volume size) still
come from a single EcVolume since every disk's entry opens the same
.ecx via NewEcVolume's cross-disk fallback.

Tests:
- TestVolumeEcShardsInfo_AggregatesAcrossDisks unit test in
  weed/server/.
- test/volume_server/grpc/ec_verify_multi_disk_test.go integration test
  drives the full generate -> mount -> redistribute -> restart ->
  reconcile path and asserts both VolumeEcShardsInfo and
  VerifyShardsAcrossServers + RequireFullShardSet (the production
  source-deletion gate) report all 14 shards.
- ec_multi_disk_lifecycle_test.go tightened: replaces the
  "VolumeEcShardsInfo only sees one disk's EcVolume" workaround with a
  full-shard-set assertion.

* review: use ShardBits bitmask + cap-pre-allocation for shard dedup
2026-05-19 14:58:56 -07:00

203 lines
6.2 KiB
Go

package weed_server
import (
"context"
"os"
"path/filepath"
"sort"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestCheckEcVolumeStatusCountOnlyDataShards(t *testing.T) {
tempDir := t.TempDir()
dataDir := filepath.Join(tempDir, "data")
idxDir := filepath.Join(tempDir, "idx")
if err := os.MkdirAll(dataDir, 0o755); err != nil {
t.Fatalf("mkdir data dir: %v", err)
}
if err := os.MkdirAll(idxDir, 0o755); err != nil {
t.Fatalf("mkdir idx dir: %v", err)
}
baseName := "7"
filesToCreate := []string{
filepath.Join(dataDir, baseName+".ec00"),
filepath.Join(dataDir, baseName+".ec09"),
filepath.Join(dataDir, baseName+".ec13"),
filepath.Join(idxDir, baseName+".ecx"),
filepath.Join(idxDir, baseName+".ecj"),
filepath.Join(idxDir, baseName+".idx"),
}
for _, fileName := range filesToCreate {
if err := os.WriteFile(fileName, []byte("x"), 0o644); err != nil {
t.Fatalf("create %s: %v", fileName, err)
}
}
location := &storage.DiskLocation{
Directory: dataDir,
IdxDirectory: idxDir,
}
hasEcxFile, hasIdxFile, shardCount, err := checkEcVolumeStatus(baseName, location)
if err != nil {
t.Fatalf("checkEcVolumeStatus: %v", err)
}
if !hasEcxFile {
t.Fatalf("expected hasEcxFile=true")
}
if !hasIdxFile {
t.Fatalf("expected hasIdxFile=true")
}
if shardCount != 3 {
t.Fatalf("expected shardCount=3, got %d", shardCount)
}
}
// TestVolumeEcShardsInfo_AggregatesAcrossDisks pins the multi-disk path:
// when a volume server mounts EC shards for the same volume on more than
// one disk (each disk holds its own EcVolume entry — Store.FindEcVolume
// returns only the first), VolumeEcShardsInfo used to report shards from
// a single disk. The ec.encode verification step (verifyEcShardsBeforeDelete)
// then refused to delete the source volume because the union across
// servers fell short of dataShards + parityShards. The handler must walk
// every DiskLocation so the response covers every shard the server holds.
func TestVolumeEcShardsInfo_AggregatesAcrossDisks(t *testing.T) {
tempDir := t.TempDir()
dir0 := filepath.Join(tempDir, "disk0")
dir1 := filepath.Join(tempDir, "disk1")
for _, d := range []string{dir0, dir1} {
if err := os.MkdirAll(d, 0o755); err != nil {
t.Fatalf("mkdir %s: %v", d, err)
}
}
const collection = "ec-multi-disk-info"
vid := needle.VolumeId(42)
const dataShards, parityShards = 10, 4
const datSize int64 = 10 * 1024 * 1024
// Two shards on disk0, two on disk1. The .ecx / .ecj / .vif live on
// disk0 so each disk's EcVolume can open the index files via the
// cross-disk fallback in NewEcVolume.
shardsOnDisk0 := []erasure_coding.ShardId{0, 5}
shardsOnDisk1 := []erasure_coding.ShardId{7, 12}
store := storage.NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
[]string{dir0, dir1},
[]int32{100, 100},
[]util.MinFreeSpace{{}, {}},
"",
storage.NeedleMapInMemory,
[]types.DiskType{types.HardDriveType, types.HardDriveType},
nil,
3,
)
done := make(chan struct{})
go func() {
for {
select {
case <-store.NewEcShardsChan:
case <-store.NewVolumesChan:
case <-store.DeletedVolumesChan:
case <-store.DeletedEcShardsChan:
case <-store.StateUpdateChan:
case <-done:
return
}
}
}()
t.Cleanup(func() {
store.Close()
close(done)
})
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid))
// .ecx, .ecj, .vif live on disk0. NewEcVolume on disk1 falls back to
// disk0's idx dir. The .ecx needs >0 bytes so HasEcxFileOnDisk does
// not treat it as the corrupt-stub case; a single zero entry is the
// smallest valid index file (WalkIndex iterates one zero-sized needle).
if err := os.WriteFile(base0+".ecx", make([]byte, 16), 0o644); err != nil {
t.Fatalf("write .ecx: %v", err)
}
if err := os.WriteFile(base0+".ecj", nil, 0o644); err != nil {
t.Fatalf("write .ecj: %v", err)
}
if err := volume_info.SaveVolumeInfo(base0+".vif", &volume_server_pb.VolumeInfo{
Version: uint32(needle.Version3),
DatFileSize: datSize,
EcShardConfig: &volume_server_pb.EcShardConfig{
DataShards: dataShards,
ParityShards: parityShards,
},
}); err != nil {
t.Fatalf("save .vif: %v", err)
}
plant := func(base string, shardId erasure_coding.ShardId) {
t.Helper()
f, err := os.Create(base + erasure_coding.ToExt(int(shardId)))
if err != nil {
t.Fatalf("create shard %d: %v", shardId, err)
}
// MountEcShards does not validate shard size, so any non-empty
// truncate avoids the zero-byte ignore branch in loadAllEcShards.
if err := f.Truncate(1); err != nil {
f.Close()
t.Fatalf("truncate shard %d: %v", shardId, err)
}
f.Close()
}
for _, sid := range shardsOnDisk0 {
plant(base0, sid)
}
for _, sid := range shardsOnDisk1 {
plant(base1, sid)
}
for _, sid := range append([]erasure_coding.ShardId{}, append(shardsOnDisk0, shardsOnDisk1...)...) {
if err := store.MountEcShards(collection, vid, sid, ""); err != nil {
t.Fatalf("MountEcShards %d.%d: %v", vid, sid, err)
}
}
vs := &VolumeServer{store: store}
resp, err := vs.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
VolumeId: uint32(vid),
})
if err != nil {
t.Fatalf("VolumeEcShardsInfo: %v", err)
}
gotShardIds := make([]int, 0, len(resp.GetEcShardInfos()))
for _, info := range resp.GetEcShardInfos() {
if info.GetVolumeId() != uint32(vid) {
t.Errorf("EcShardInfo VolumeId=%d, want %d", info.GetVolumeId(), vid)
}
gotShardIds = append(gotShardIds, int(info.GetShardId()))
}
sort.Ints(gotShardIds)
want := []int{0, 5, 7, 12}
if len(gotShardIds) != len(want) {
t.Fatalf("VolumeEcShardsInfo returned %d shards (ids=%v), want %d (ids=%v)",
len(gotShardIds), gotShardIds, len(want), want)
}
for i, sid := range want {
if gotShardIds[i] != sid {
t.Fatalf("VolumeEcShardsInfo shard ids=%v, want %v", gotShardIds, want)
}
}
}