mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-23 10:11:28 +00:00
* fix(ec): VolumeEcShardsInfo walks every disk on multi-disk servers When a volume server holds EC shards for the same vid across more than one disk, each DiskLocation registers its own EcVolume entry and Store.FindEcVolume returns whichever one it hits first. The shard-info RPC iterated only that single EcVolume's Shards, so the response missed every shard mounted on a sibling disk. The worker's verifyEcShardsBeforeDelete sums the per-server responses into a union bitmap and refuses to delete the source volume when the union falls short of dataShards+parityShards. On multi-disk destinations, the union was systematically under-counted and source deletion got blocked even though all shards were physically present and mounted. Walk every DiskLocation in the handler and emit the deduplicated union of all shards. The .ecx-backed fields (file counts, volume size) still come from a single EcVolume since every disk's entry opens the same .ecx via NewEcVolume's cross-disk fallback. Tests: - TestVolumeEcShardsInfo_AggregatesAcrossDisks unit test in weed/server/. - test/volume_server/grpc/ec_verify_multi_disk_test.go integration test drives the full generate -> mount -> redistribute -> restart -> reconcile path and asserts both VolumeEcShardsInfo and VerifyShardsAcrossServers + RequireFullShardSet (the production source-deletion gate) report all 14 shards. - ec_multi_disk_lifecycle_test.go tightened: replaces the "VolumeEcShardsInfo only sees one disk's EcVolume" workaround with a full-shard-set assertion. * review: use ShardBits bitmask + cap-pre-allocation for shard dedup
203 lines
6.2 KiB
Go
203 lines
6.2 KiB
Go
package weed_server
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"testing"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
func TestCheckEcVolumeStatusCountOnlyDataShards(t *testing.T) {
|
|
tempDir := t.TempDir()
|
|
dataDir := filepath.Join(tempDir, "data")
|
|
idxDir := filepath.Join(tempDir, "idx")
|
|
if err := os.MkdirAll(dataDir, 0o755); err != nil {
|
|
t.Fatalf("mkdir data dir: %v", err)
|
|
}
|
|
if err := os.MkdirAll(idxDir, 0o755); err != nil {
|
|
t.Fatalf("mkdir idx dir: %v", err)
|
|
}
|
|
|
|
baseName := "7"
|
|
filesToCreate := []string{
|
|
filepath.Join(dataDir, baseName+".ec00"),
|
|
filepath.Join(dataDir, baseName+".ec09"),
|
|
filepath.Join(dataDir, baseName+".ec13"),
|
|
filepath.Join(idxDir, baseName+".ecx"),
|
|
filepath.Join(idxDir, baseName+".ecj"),
|
|
filepath.Join(idxDir, baseName+".idx"),
|
|
}
|
|
for _, fileName := range filesToCreate {
|
|
if err := os.WriteFile(fileName, []byte("x"), 0o644); err != nil {
|
|
t.Fatalf("create %s: %v", fileName, err)
|
|
}
|
|
}
|
|
|
|
location := &storage.DiskLocation{
|
|
Directory: dataDir,
|
|
IdxDirectory: idxDir,
|
|
}
|
|
|
|
hasEcxFile, hasIdxFile, shardCount, err := checkEcVolumeStatus(baseName, location)
|
|
if err != nil {
|
|
t.Fatalf("checkEcVolumeStatus: %v", err)
|
|
}
|
|
|
|
if !hasEcxFile {
|
|
t.Fatalf("expected hasEcxFile=true")
|
|
}
|
|
if !hasIdxFile {
|
|
t.Fatalf("expected hasIdxFile=true")
|
|
}
|
|
if shardCount != 3 {
|
|
t.Fatalf("expected shardCount=3, got %d", shardCount)
|
|
}
|
|
}
|
|
|
|
// TestVolumeEcShardsInfo_AggregatesAcrossDisks pins the multi-disk path:
|
|
// when a volume server mounts EC shards for the same volume on more than
|
|
// one disk (each disk holds its own EcVolume entry — Store.FindEcVolume
|
|
// returns only the first), VolumeEcShardsInfo used to report shards from
|
|
// a single disk. The ec.encode verification step (verifyEcShardsBeforeDelete)
|
|
// then refused to delete the source volume because the union across
|
|
// servers fell short of dataShards + parityShards. The handler must walk
|
|
// every DiskLocation so the response covers every shard the server holds.
|
|
func TestVolumeEcShardsInfo_AggregatesAcrossDisks(t *testing.T) {
|
|
tempDir := t.TempDir()
|
|
dir0 := filepath.Join(tempDir, "disk0")
|
|
dir1 := filepath.Join(tempDir, "disk1")
|
|
for _, d := range []string{dir0, dir1} {
|
|
if err := os.MkdirAll(d, 0o755); err != nil {
|
|
t.Fatalf("mkdir %s: %v", d, err)
|
|
}
|
|
}
|
|
|
|
const collection = "ec-multi-disk-info"
|
|
vid := needle.VolumeId(42)
|
|
const dataShards, parityShards = 10, 4
|
|
const datSize int64 = 10 * 1024 * 1024
|
|
|
|
// Two shards on disk0, two on disk1. The .ecx / .ecj / .vif live on
|
|
// disk0 so each disk's EcVolume can open the index files via the
|
|
// cross-disk fallback in NewEcVolume.
|
|
shardsOnDisk0 := []erasure_coding.ShardId{0, 5}
|
|
shardsOnDisk1 := []erasure_coding.ShardId{7, 12}
|
|
|
|
store := storage.NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
|
|
[]string{dir0, dir1},
|
|
[]int32{100, 100},
|
|
[]util.MinFreeSpace{{}, {}},
|
|
"",
|
|
storage.NeedleMapInMemory,
|
|
[]types.DiskType{types.HardDriveType, types.HardDriveType},
|
|
nil,
|
|
3,
|
|
)
|
|
done := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-store.NewEcShardsChan:
|
|
case <-store.NewVolumesChan:
|
|
case <-store.DeletedVolumesChan:
|
|
case <-store.DeletedEcShardsChan:
|
|
case <-store.StateUpdateChan:
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
t.Cleanup(func() {
|
|
store.Close()
|
|
close(done)
|
|
})
|
|
|
|
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
|
|
base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid))
|
|
|
|
// .ecx, .ecj, .vif live on disk0. NewEcVolume on disk1 falls back to
|
|
// disk0's idx dir. The .ecx needs >0 bytes so HasEcxFileOnDisk does
|
|
// not treat it as the corrupt-stub case; a single zero entry is the
|
|
// smallest valid index file (WalkIndex iterates one zero-sized needle).
|
|
if err := os.WriteFile(base0+".ecx", make([]byte, 16), 0o644); err != nil {
|
|
t.Fatalf("write .ecx: %v", err)
|
|
}
|
|
if err := os.WriteFile(base0+".ecj", nil, 0o644); err != nil {
|
|
t.Fatalf("write .ecj: %v", err)
|
|
}
|
|
if err := volume_info.SaveVolumeInfo(base0+".vif", &volume_server_pb.VolumeInfo{
|
|
Version: uint32(needle.Version3),
|
|
DatFileSize: datSize,
|
|
EcShardConfig: &volume_server_pb.EcShardConfig{
|
|
DataShards: dataShards,
|
|
ParityShards: parityShards,
|
|
},
|
|
}); err != nil {
|
|
t.Fatalf("save .vif: %v", err)
|
|
}
|
|
|
|
plant := func(base string, shardId erasure_coding.ShardId) {
|
|
t.Helper()
|
|
f, err := os.Create(base + erasure_coding.ToExt(int(shardId)))
|
|
if err != nil {
|
|
t.Fatalf("create shard %d: %v", shardId, err)
|
|
}
|
|
// MountEcShards does not validate shard size, so any non-empty
|
|
// truncate avoids the zero-byte ignore branch in loadAllEcShards.
|
|
if err := f.Truncate(1); err != nil {
|
|
f.Close()
|
|
t.Fatalf("truncate shard %d: %v", shardId, err)
|
|
}
|
|
f.Close()
|
|
}
|
|
for _, sid := range shardsOnDisk0 {
|
|
plant(base0, sid)
|
|
}
|
|
for _, sid := range shardsOnDisk1 {
|
|
plant(base1, sid)
|
|
}
|
|
|
|
for _, sid := range append([]erasure_coding.ShardId{}, append(shardsOnDisk0, shardsOnDisk1...)...) {
|
|
if err := store.MountEcShards(collection, vid, sid, ""); err != nil {
|
|
t.Fatalf("MountEcShards %d.%d: %v", vid, sid, err)
|
|
}
|
|
}
|
|
|
|
vs := &VolumeServer{store: store}
|
|
resp, err := vs.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
|
|
VolumeId: uint32(vid),
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("VolumeEcShardsInfo: %v", err)
|
|
}
|
|
|
|
gotShardIds := make([]int, 0, len(resp.GetEcShardInfos()))
|
|
for _, info := range resp.GetEcShardInfos() {
|
|
if info.GetVolumeId() != uint32(vid) {
|
|
t.Errorf("EcShardInfo VolumeId=%d, want %d", info.GetVolumeId(), vid)
|
|
}
|
|
gotShardIds = append(gotShardIds, int(info.GetShardId()))
|
|
}
|
|
sort.Ints(gotShardIds)
|
|
|
|
want := []int{0, 5, 7, 12}
|
|
if len(gotShardIds) != len(want) {
|
|
t.Fatalf("VolumeEcShardsInfo returned %d shards (ids=%v), want %d (ids=%v)",
|
|
len(gotShardIds), gotShardIds, len(want), want)
|
|
}
|
|
for i, sid := range want {
|
|
if gotShardIds[i] != sid {
|
|
t.Fatalf("VolumeEcShardsInfo shard ids=%v, want %v", gotShardIds, want)
|
|
}
|
|
}
|
|
}
|