fix(ec): VolumeEcShardsInfo walks every disk on multi-disk servers (#9568)

* fix(ec): VolumeEcShardsInfo walks every disk on multi-disk servers

When a volume server holds EC shards for the same vid across more than
one disk, each DiskLocation registers its own EcVolume entry and
Store.FindEcVolume returns whichever one it hits first. The shard-info
RPC iterated only that single EcVolume's Shards, so the response missed
every shard mounted on a sibling disk.

The worker's verifyEcShardsBeforeDelete sums the per-server responses
into a union bitmap and refuses to delete the source volume when the
union falls short of dataShards+parityShards. On multi-disk
destinations, the union was systematically under-counted and source
deletion got blocked even though all shards were physically present and
mounted.

Walk every DiskLocation in the handler and emit the deduplicated union
of all shards. The .ecx-backed fields (file counts, volume size) still
come from a single EcVolume since every disk's entry opens the same
.ecx via NewEcVolume's cross-disk fallback.

Tests:
- TestVolumeEcShardsInfo_AggregatesAcrossDisks unit test in
  weed/server/.
- test/volume_server/grpc/ec_verify_multi_disk_test.go integration test
  drives the full generate -> mount -> redistribute -> restart ->
  reconcile path and asserts both VolumeEcShardsInfo and
  VerifyShardsAcrossServers + RequireFullShardSet (the production
  source-deletion gate) report all 14 shards.
- ec_multi_disk_lifecycle_test.go tightened: replaces the
  "VolumeEcShardsInfo only sees one disk's EcVolume" workaround with a
  full-shard-set assertion.

* review: use ShardBits bitmask + cap-pre-allocation for shard dedup
This commit is contained in:
Chris Lu
2026-05-19 14:58:56 -07:00
committed by GitHub
parent f72983c1fd
commit 77ac781bbd
4 changed files with 368 additions and 14 deletions

View File

@@ -136,8 +136,6 @@ func TestEcLifecycleAcrossMultipleDisks(t *testing.T) {
conn2, grpcClient2 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn2.Close()
// VolumeEcShardsInfo only sees one disk's EcVolume; filesystem layout is
// the ground truth for the whole-store shard count.
postReconcileLayout := scanShardLayout(t, dataDirs, collection, volumeID)
if got, want := totalShardsInLayout(postReconcileLayout), erasure_coding.TotalShardsCount; got != want {
t.Fatalf("post-reconcile: total shards on disk mismatch: got %d, want %d (layout=%v)", got, want, postReconcileLayout)
@@ -148,11 +146,28 @@ func TestEcLifecycleAcrossMultipleDisks(t *testing.T) {
if got, want := len(postReconcileLayout[1]), splitAt; got != want {
t.Fatalf("post-reconcile: disk 1 shard count drift: got %d, want %d (layout=%v)", got, want, postReconcileLayout)
}
if _, err := grpcClient2.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{
// VolumeEcShardsInfo must walk every DiskLocation and report the full
// shard set — the verification step in ec_task.go gates source-volume
// deletion on this RPC returning a complete shard inventory.
infoResp, err := grpcClient2.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{
VolumeId: volumeID,
}); err != nil {
})
if err != nil {
t.Fatalf("VolumeEcShardsInfo after redistribute restart: %v", err)
}
if got, want := len(infoResp.GetEcShardInfos()), erasure_coding.TotalShardsCount; got != want {
t.Fatalf("VolumeEcShardsInfo after redistribute restart: got %d shards, want %d (per-disk layout=%v)",
got, want, postReconcileLayout)
}
gotShardIds := make(map[uint32]struct{}, len(infoResp.GetEcShardInfos()))
for _, info := range infoResp.GetEcShardInfos() {
gotShardIds[info.GetShardId()] = struct{}{}
}
for shardId := uint32(0); shardId < uint32(erasure_coding.TotalShardsCount); shardId++ {
if _, ok := gotShardIds[shardId]; !ok {
t.Fatalf("VolumeEcShardsInfo missing shard %d (per-disk layout=%v)", shardId, postReconcileLayout)
}
}
for _, n := range needles {
verifyHTTPRead(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload, "after-cross-disk-reconcile")
}

View File

@@ -0,0 +1,175 @@
package volume_server_grpc_test
import (
"context"
"net/http"
"sort"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/volume_server/framework"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// TestVolumeEcShardsInfoReturnsAllShardsAcrossDisks drives the full path
// behind the ec.encode source-deletion gate. A multi-disk volume server
// ends up with EC shards split across disks (each registers its own
// EcVolume entry in DiskLocation.ecVolumes), and the volume server's
// VolumeEcShardsInfo RPC must walk every DiskLocation rather than
// reporting whichever disk Store.FindEcVolume picks first.
//
// Pre-fix, verifyEcShardsBeforeDelete refused to delete source volumes —
// the shard-bitmap union across destinations fell short of dataShards +
// parityShards because each destination only reported shards on one of
// its disks. With the handler fix, the same VerifyShardsAcrossServers
// call returns a complete bitmap and the gate opens.
func TestVolumeEcShardsInfoReturnsAllShardsAcrossDisks(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}
const (
dataDirCount = 2
volumeID = uint32(9558)
collection = "ec-multi-disk-verify"
)
clusterHarness := framework.StartSingleVolumeClusterWithDataDirs(t, matrix.P1(), dataDirCount)
dataDirs := clusterHarness.VolumeDataDirs()
if len(dataDirs) != dataDirCount {
t.Fatalf("expected %d data dirs, got %d: %v", dataDirCount, len(dataDirs), dataDirs)
}
conn, grpcClient := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
framework.AllocateVolume(t, grpcClient, volumeID, collection)
httpClient := framework.NewHTTPClient()
needles := []struct {
fid string
payload []byte
}{
{framework.NewFileID(volumeID, 9559, 0xC0FFEE01), bytesOfLen(64, 0xB1)},
{framework.NewFileID(volumeID, 9560, 0xC0FFEE02), bytesOfLen(8192, 0xB2)},
{framework.NewFileID(volumeID, 9561, 0xC0FFEE03), bytesOfLen(131072, 0xB3)},
}
for _, n := range needles {
resp := framework.UploadBytes(t, httpClient, clusterHarness.VolumeAdminURL(), n.fid, n.payload)
_ = framework.ReadAllAndClose(t, resp)
if resp.StatusCode != http.StatusCreated {
t.Fatalf("upload %s expected 201, got %d", n.fid, resp.StatusCode)
}
}
if _, err := grpcClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
VolumeId: volumeID,
Collection: collection,
}); err != nil {
t.Fatalf("VolumeEcShardsGenerate: %v", err)
}
// Generate places every shard plus the .ecx/.ecj/.vif on the .dat's
// disk (disk 0). Mount all 14 there first so the next step's restart
// has a steady starting state.
allShards := make([]uint32, erasure_coding.TotalShardsCount)
for i := range allShards {
allShards[i] = uint32(i)
}
if _, err := grpcClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: volumeID,
Collection: collection,
ShardIds: allShards,
}); err != nil {
t.Fatalf("VolumeEcShardsMount all shards: %v", err)
}
// Drop the .dat so the EC shards are the only data path — mirrors the
// real ec.encode flow before verifyEcShardsBeforeDelete fires.
if _, err := grpcClient.VolumeDelete(ctx, &volume_server_pb.VolumeDeleteRequest{
VolumeId: volumeID,
}); err != nil {
t.Fatalf("VolumeDelete (drop .dat): %v", err)
}
// Move half the shards onto disk 1, leaving .ecx on disk 0. After
// restart, the cross-disk reconcile path attaches each disk's shards
// against its own EcVolume entry — the exact in-memory shape the bug
// reporter saw on a multi-disk destination.
clusterHarness.StopVolumeServer()
const splitAt = 7
for shard := 0; shard < splitAt; shard++ {
movedFile(t, dataDirs[0], dataDirs[1], collection, volumeID, erasure_coding.ToExt(shard))
}
if fileExistsIn(dataDirs[1], collection, volumeID, ".ecx") {
t.Fatalf("setup: .ecx must stay on disk 0 to exercise the multi-disk path")
}
clusterHarness.RestartVolumeServer()
conn2, grpcClient2 := framework.DialVolumeServer(t, clusterHarness.VolumeGRPCAddress())
defer conn2.Close()
postReconcileLayout := scanShardLayout(t, dataDirs, collection, volumeID)
if got, want := totalShardsInLayout(postReconcileLayout), erasure_coding.TotalShardsCount; got != want {
t.Fatalf("post-reconcile: total shards on disk mismatch: got %d, want %d (layout=%v)", got, want, postReconcileLayout)
}
if len(postReconcileLayout[0]) == 0 || len(postReconcileLayout[1]) == 0 {
t.Fatalf("post-reconcile: expected shards on BOTH disks, got per-disk layout %v", postReconcileLayout)
}
// Direct RPC assertion: VolumeEcShardsInfo must report every shard
// the server holds, not just the ones registered against the first
// matching DiskLocation.
infoResp, err := grpcClient2.VolumeEcShardsInfo(ctx, &volume_server_pb.VolumeEcShardsInfoRequest{
VolumeId: volumeID,
})
if err != nil {
t.Fatalf("VolumeEcShardsInfo: %v", err)
}
gotShardIds := make([]int, 0, len(infoResp.GetEcShardInfos()))
for _, info := range infoResp.GetEcShardInfos() {
if info.GetVolumeId() != volumeID {
t.Errorf("EcShardInfo VolumeId=%d, want %d", info.GetVolumeId(), volumeID)
}
gotShardIds = append(gotShardIds, int(info.GetShardId()))
}
sort.Ints(gotShardIds)
wantShardIds := make([]int, erasure_coding.TotalShardsCount)
for i := range wantShardIds {
wantShardIds[i] = i
}
if len(gotShardIds) != len(wantShardIds) {
t.Fatalf("VolumeEcShardsInfo returned %d shards (ids=%v), want %d (ids=%v) — per-disk layout=%v",
len(gotShardIds), gotShardIds, len(wantShardIds), wantShardIds, postReconcileLayout)
}
for i, sid := range wantShardIds {
if gotShardIds[i] != sid {
t.Fatalf("VolumeEcShardsInfo shard ids=%v, want %v (per-disk layout=%v)",
gotShardIds, wantShardIds, postReconcileLayout)
}
}
// End-to-end assertion via the same helper the worker uses to gate
// source-volume deletion (weed/worker/tasks/erasure_coding/ec_task.go
// verifyEcShardsBeforeDelete). The union across destinations is what
// RequireFullShardSet measures; with one destination that holds every
// shard, the union must cover dataShards + parityShards.
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
servers := []string{clusterHarness.VolumeServerAddress()}
union, perServer := erasure_coding.VerifyShardsAcrossServers(ctx, volumeID, servers, dialOption)
if err := erasure_coding.RequireFullShardSet(volumeID, union, erasure_coding.TotalShardsCount); err != nil {
t.Fatalf("verifyEcShardsBeforeDelete-equivalent gate failed: %v\nper-server inventory: %s\nper-disk layout: %v",
err, erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout)
}
if got, want := union.Count(), erasure_coding.TotalShardsCount; got != want {
t.Fatalf("VerifyShardsAcrossServers union covered %d/%d shards (per-server=%s, layout=%v)",
got, want, erasure_coding.SummarizeShardInventory(perServer), postReconcileLayout)
}
}

View File

@@ -710,21 +710,39 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
func (vs *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) {
glog.V(0).Infof("VolumeEcShardsInfo: volume %d", req.VolumeId)
glog.V(0).Infof("VolumeEcStatus: %v", req)
vid := needle.VolumeId(req.GetVolumeId())
ecv, found := vs.store.FindEcVolume(vid)
if !found {
return nil, fmt.Errorf("VolumeEcStatus: EC volume %d not found", vid)
}
shardInfos := make([]*volume_server_pb.EcShardInfo, len(ecv.Shards))
for i, s := range ecv.Shards {
shardInfos[i] = s.ToEcShardInfo()
// Multi-disk volume servers register one EcVolume per DiskLocation
// that holds shards for the same vid: shards may be spread across
// disks while the .ecx lives on whichever disk owned the original
// .dat. Walk every DiskLocation here so the response reflects the
// full local shard set; the per-disk ecVolumesLock is taken inside
// DiskLocation.FindEcVolume.
var primary *erasure_coding.EcVolume
var seenShards erasure_coding.ShardBits
shardInfos := make([]*volume_server_pb.EcShardInfo, 0, erasure_coding.MaxShardCount)
for _, location := range vs.store.Locations {
ecv, ok := location.FindEcVolume(vid)
if !ok {
continue
}
if primary == nil {
primary = ecv
}
for _, s := range ecv.Shards {
if seenShards.Has(s.ShardId) {
continue
}
seenShards = seenShards.Set(s.ShardId)
shardInfos = append(shardInfos, s.ToEcShardInfo())
}
}
if primary == nil {
return nil, fmt.Errorf("VolumeEcShardsInfo: EC volume %d not found", vid)
}
var files, filesDeleted, totalSize uint64
err := ecv.WalkIndex(func(_ types.NeedleId, _ types.Offset, size types.Size) error {
err := primary.WalkIndex(func(_ types.NeedleId, _ types.Offset, size types.Size) error {
// deleted files are counted when computing EC volume sizes. this aligns with VolumeStatus(),
// which reports the raw data backend file size, regardless of deleted files.
totalSize += uint64(size.Raw())

View File

@@ -1,11 +1,19 @@
package weed_server
import (
"context"
"os"
"path/filepath"
"sort"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func TestCheckEcVolumeStatusCountOnlyDataShards(t *testing.T) {
@@ -54,3 +62,141 @@ func TestCheckEcVolumeStatusCountOnlyDataShards(t *testing.T) {
t.Fatalf("expected shardCount=3, got %d", shardCount)
}
}
// TestVolumeEcShardsInfo_AggregatesAcrossDisks pins the multi-disk path:
// when a volume server mounts EC shards for the same volume on more than
// one disk (each disk holds its own EcVolume entry — Store.FindEcVolume
// returns only the first), VolumeEcShardsInfo used to report shards from
// a single disk. The ec.encode verification step (verifyEcShardsBeforeDelete)
// then refused to delete the source volume because the union across
// servers fell short of dataShards + parityShards. The handler must walk
// every DiskLocation so the response covers every shard the server holds.
func TestVolumeEcShardsInfo_AggregatesAcrossDisks(t *testing.T) {
tempDir := t.TempDir()
dir0 := filepath.Join(tempDir, "disk0")
dir1 := filepath.Join(tempDir, "disk1")
for _, d := range []string{dir0, dir1} {
if err := os.MkdirAll(d, 0o755); err != nil {
t.Fatalf("mkdir %s: %v", d, err)
}
}
const collection = "ec-multi-disk-info"
vid := needle.VolumeId(42)
const dataShards, parityShards = 10, 4
const datSize int64 = 10 * 1024 * 1024
// Two shards on disk0, two on disk1. The .ecx / .ecj / .vif live on
// disk0 so each disk's EcVolume can open the index files via the
// cross-disk fallback in NewEcVolume.
shardsOnDisk0 := []erasure_coding.ShardId{0, 5}
shardsOnDisk1 := []erasure_coding.ShardId{7, 12}
store := storage.NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
[]string{dir0, dir1},
[]int32{100, 100},
[]util.MinFreeSpace{{}, {}},
"",
storage.NeedleMapInMemory,
[]types.DiskType{types.HardDriveType, types.HardDriveType},
nil,
3,
)
done := make(chan struct{})
go func() {
for {
select {
case <-store.NewEcShardsChan:
case <-store.NewVolumesChan:
case <-store.DeletedVolumesChan:
case <-store.DeletedEcShardsChan:
case <-store.StateUpdateChan:
case <-done:
return
}
}
}()
t.Cleanup(func() {
store.Close()
close(done)
})
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid))
// .ecx, .ecj, .vif live on disk0. NewEcVolume on disk1 falls back to
// disk0's idx dir. The .ecx needs >0 bytes so HasEcxFileOnDisk does
// not treat it as the corrupt-stub case; a single zero entry is the
// smallest valid index file (WalkIndex iterates one zero-sized needle).
if err := os.WriteFile(base0+".ecx", make([]byte, 16), 0o644); err != nil {
t.Fatalf("write .ecx: %v", err)
}
if err := os.WriteFile(base0+".ecj", nil, 0o644); err != nil {
t.Fatalf("write .ecj: %v", err)
}
if err := volume_info.SaveVolumeInfo(base0+".vif", &volume_server_pb.VolumeInfo{
Version: uint32(needle.Version3),
DatFileSize: datSize,
EcShardConfig: &volume_server_pb.EcShardConfig{
DataShards: dataShards,
ParityShards: parityShards,
},
}); err != nil {
t.Fatalf("save .vif: %v", err)
}
plant := func(base string, shardId erasure_coding.ShardId) {
t.Helper()
f, err := os.Create(base + erasure_coding.ToExt(int(shardId)))
if err != nil {
t.Fatalf("create shard %d: %v", shardId, err)
}
// MountEcShards does not validate shard size, so any non-empty
// truncate avoids the zero-byte ignore branch in loadAllEcShards.
if err := f.Truncate(1); err != nil {
f.Close()
t.Fatalf("truncate shard %d: %v", shardId, err)
}
f.Close()
}
for _, sid := range shardsOnDisk0 {
plant(base0, sid)
}
for _, sid := range shardsOnDisk1 {
plant(base1, sid)
}
for _, sid := range append([]erasure_coding.ShardId{}, append(shardsOnDisk0, shardsOnDisk1...)...) {
if err := store.MountEcShards(collection, vid, sid, ""); err != nil {
t.Fatalf("MountEcShards %d.%d: %v", vid, sid, err)
}
}
vs := &VolumeServer{store: store}
resp, err := vs.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
VolumeId: uint32(vid),
})
if err != nil {
t.Fatalf("VolumeEcShardsInfo: %v", err)
}
gotShardIds := make([]int, 0, len(resp.GetEcShardInfos()))
for _, info := range resp.GetEcShardInfos() {
if info.GetVolumeId() != uint32(vid) {
t.Errorf("EcShardInfo VolumeId=%d, want %d", info.GetVolumeId(), vid)
}
gotShardIds = append(gotShardIds, int(info.GetShardId()))
}
sort.Ints(gotShardIds)
want := []int{0, 5, 7, 12}
if len(gotShardIds) != len(want) {
t.Fatalf("VolumeEcShardsInfo returned %d shards (ids=%v), want %d (ids=%v)",
len(gotShardIds), gotShardIds, len(want), want)
}
for i, sid := range want {
if gotShardIds[i] != sid {
t.Fatalf("VolumeEcShardsInfo shard ids=%v, want %v", gotShardIds, want)
}
}
}