diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 30bec6791..c41afed38 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -102,12 +102,16 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S // the orphan-shard layout reported in #9212. func (l *DiskLocation) HasEcxFileOnDisk(collection string, vid needle.VolumeId) bool { idxBase := erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid)) - if info, err := os.Stat(idxBase + ".ecx"); err == nil && !info.IsDir() { + // A 0-byte .ecx is a corrupt stub left by a failed EC distribute copy; + // it cannot drive mount and must not steer placement decisions toward + // this disk. Treat it as absent so the caller falls through to a + // sibling disk that may hold a valid index. + if info, err := os.Stat(idxBase + ".ecx"); err == nil && !info.IsDir() && info.Size() > 0 { return true } if l.IdxDirectory != l.Directory { dataBase := erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) - if info, err := os.Stat(dataBase + ".ecx"); err == nil && !info.IsDir() { + if info, err := os.Stat(dataBase + ".ecx"); err == nil && !info.IsDir() && info.Size() > 0 { return true } } @@ -130,7 +134,7 @@ func (l *DiskLocation) loadEcShardWithIdxDir(collection string, vid needle.Volum if err == os.ErrNotExist { return nil, os.ErrNotExist } - return nil, fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err) + return nil, fmt.Errorf("failed to create ec shard %d.%d: %w", vid, shardId, err) } l.ecVolumesLock.Lock() defer l.ecVolumesLock.Unlock() @@ -138,7 +142,10 @@ func (l *DiskLocation) loadEcShardWithIdxDir(collection string, vid needle.Volum if !found { ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, idxDir, collection, vid) if err != nil { - return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err) + // Wrap with %w so MountEcShards / startup reconcile can use + // errors.Is(err, os.ErrNotExist) to decide whether to try the + // next local disk vs. surface the failure. + return nil, fmt.Errorf("failed to create ec volume %d: %w", vid, err) } l.ecVolumes[vid] = ecVolume } diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index fa24d2118..fd5e5b5db 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -67,7 +67,15 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection dataBaseFileName := EcShardFileName(collection, dir, int(vid)) indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid)) - // open ecx file + // open ecx file. Wrap errors with %w so callers walking up the stack + // (notably Store.MountEcShards) can use errors.Is(err, os.ErrNotExist) + // to decide whether to try the next local disk vs. bail. A 0-byte .ecx + // is a legitimate index for a volume that had no live needles at encode + // time (e.g. all needles deleted before WriteSortedFileFromIdx) and + // must mount successfully here. A 0-byte stub left by a failed copy + // stream is indistinguishable from that empty case by file size alone; + // preventing such stubs is the receiver-side cleanup in writeToFile's + // job, not this open path. ev.ecxActualDir = dirIdx if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil { if dirIdx != dir && os.IsNotExist(err) { @@ -75,18 +83,23 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection firstErr := err glog.V(1).Infof("ecx file not found at %s.ecx, falling back to %s.ecx", indexBaseFileName, dataBaseFileName) if ev.ecxFile, err = os.OpenFile(dataBaseFileName+".ecx", os.O_RDWR, 0644); err != nil { - return nil, fmt.Errorf("open ecx index %s.ecx: %v; fallback %s.ecx: %v", indexBaseFileName, firstErr, dataBaseFileName, err) + if os.IsNotExist(err) { + return nil, fmt.Errorf("open ecx index %s.ecx (fallback %s.ecx): %w", indexBaseFileName, dataBaseFileName, os.ErrNotExist) + } + return nil, fmt.Errorf("open ecx index %s.ecx: %v; fallback %s.ecx: %w", indexBaseFileName, firstErr, dataBaseFileName, err) } indexBaseFileName = dataBaseFileName ev.ecxActualDir = dir + } else if os.IsNotExist(err) { + return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %w", indexBaseFileName, os.ErrNotExist) } else { - return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err) + return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %w", indexBaseFileName, err) } } ecxFi, statErr := ev.ecxFile.Stat() if statErr != nil { _ = ev.ecxFile.Close() - return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr) + return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %w", indexBaseFileName, statErr) } ev.ecxFileSize = ecxFi.Size() ev.ecxCreatedAt = ecxFi.ModTime() diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index cef80faf9..8d9cb695d 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -164,13 +164,25 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er // holding the .ec?? shard being mounted: ec.balance / ec.rebuild can // place the .ecx on one local disk while later distributing shards // across sibling disks of the same volume server. The per-disk - // IdxDirectory used by LoadEcShard would ENOENT the .ecx and fail - // with "cannot open ec volume index" instead of returning - // ErrNotExist, so the loop below would bail before trying other - // disks. Look up the .ecx owner across all DiskLocations once and - // route NewEcVolume at the directory that actually has the file. + // IdxDirectory used by LoadEcShard would ENOENT the .ecx, so look up + // the .ecx owner across all DiskLocations once and route NewEcVolume + // at the directory that actually has the file. A 0-byte .ecx is + // treated as missing here (writeToFile can leave a stub on a failed + // EC distribute) so we still scan the rest of the disks. ecxIdxDir, ecxFound := s.findEcxIdxDirForVolume(collection, vid) + // Collect failures so an all-disks-fail return reports every disk we + // tried rather than just the first one. Before this loop reordered + // itself to keep going after the first non-ENOENT error, a single + // shard-on-disk-without-.ecx situation would bail the loop and the + // operator saw "cannot open ec volume index" naming exactly one disk + // even when others held a valid index. + type diskError struct { + dir string + err error + } + var failures []diskError + for diskId, location := range s.Locations { idxDir := location.IdxDirectory if ecxFound { @@ -185,7 +197,8 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er } } } - if ecVolume, err := location.loadEcShardWithIdxDir(collection, vid, shardId, idxDir); err == nil { + ecVolume, err := location.loadEcShardWithIdxDir(collection, vid, shardId, idxDir) + if err == nil { glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId) // Apply the orchestrator-supplied source disk type so the EC @@ -207,14 +220,35 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er DiskId: uint32(diskId), } return nil - } else if err == os.ErrNotExist { - continue - } else { - return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err) } + if errors.Is(err, os.ErrNotExist) { + // Shard or index not on this disk; another disk may own it. + continue + } + failures = append(failures, diskError{dir: location.Directory, err: err}) } - return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId) + if len(failures) == 0 { + // No disk had the shard or the index; this volume server is not + // holding any artefacts for the requested shard. Name what we + // scanned for so the operator can tell "no .ecx anywhere" apart + // from "shard not on this server". + if !ecxFound { + return fmt.Errorf("MountEcShards %d.%d: no .ecx index found on any local disk", vid, shardId) + } + return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId) + } + // Some disks returned a real (non-ENOENT) error. Report them all so + // the caller can see whether the failures cluster around one disk + // (likely hardware) or are spread out (likely a config problem). + var b []byte + for i, f := range failures { + if i > 0 { + b = append(b, "; "...) + } + b = append(b, fmt.Sprintf("%s: %v", f.dir, f.err)...) + } + return fmt.Errorf("MountEcShards %d.%d load failures: %s", vid, shardId, string(b)) } func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error { diff --git a/weed/storage/store_ec_mount_cross_disk_test.go b/weed/storage/store_ec_mount_cross_disk_test.go index 08743f345..20790574a 100644 --- a/weed/storage/store_ec_mount_cross_disk_test.go +++ b/weed/storage/store_ec_mount_cross_disk_test.go @@ -236,3 +236,239 @@ func TestMountEcShards_SameDiskEcxStillWorks(t *testing.T) { t.Errorf("EcVolume .ecx resolved at %q, want directory %q (same disk as shard)", got, want) } } + +// startEcMountStore plants two empty disk directories and returns a Store +// configured over them, draining the announcement channels in the +// background so MountEcShards does not block. Shared setup for the +// missing-.ecx and 0-byte-.ecx tests below. +func startEcMountStore(t *testing.T, dirs []string) *Store { + t.Helper() + for _, d := range dirs { + if err := os.MkdirAll(d, 0o755); err != nil { + t.Fatalf("mkdir %s: %v", d, err) + } + } + store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id", + dirs, + []int32{100, 100}, + []util.MinFreeSpace{{}, {}}, + "", + NeedleMapInMemory, + []types.DiskType{types.HardDriveType, types.HardDriveType}, + nil, + 3, + ) + done := make(chan struct{}) + go func() { + for { + select { + case <-store.NewEcShardsChan: + case <-store.NewVolumesChan: + case <-store.DeletedVolumesChan: + case <-store.DeletedEcShardsChan: + case <-store.StateUpdateChan: + case <-done: + return + } + } + }() + t.Cleanup(func() { + store.Close() + close(done) + }) + return store +} + +// TestMountEcShards_MissingEcxOnAllDisks covers the case where the EC +// distribute step has dropped shards on the volume server but no .ecx +// ever arrived (the operator-reported "Node 1 has 17 shard files but +// zero .ecx" pattern). MountEcShards used to bail on the first disk's +// "cannot open ec volume index" error — which is not os.ErrNotExist +// because NewEcVolume wrapped it with %v — and the operator saw a +// confusing per-disk error. The mount path now keeps scanning every +// local disk and, when none own an index, returns one clear error. +func TestMountEcShards_MissingEcxOnAllDisks(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "disk0") + dir1 := filepath.Join(tempDir, "disk1") + store := startEcMountStore(t, []string{dir0, dir1}) + + const collection = "mybucket" + vid := needle.VolumeId(11) + const dataShards = 10 + const datSize int64 = 10 * 1024 * 1024 + expectedShardSize := calculateExpectedShardSize(datSize, dataShards) + const shardOnDisk0 erasure_coding.ShardId = 6 + + // Plant a shard file on disk0 but no .ecx anywhere. + base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid)) + f, err := os.Create(base0 + erasure_coding.ToExt(int(shardOnDisk0))) + if err != nil { + t.Fatalf("create shard: %v", err) + } + if err := f.Truncate(expectedShardSize); err != nil { + f.Close() + t.Fatalf("truncate shard: %v", err) + } + f.Close() + + err = store.MountEcShards(collection, vid, shardOnDisk0, "") + if err == nil { + t.Fatalf("MountEcShards should fail when no .ecx exists on any disk") + } + if !strings.Contains(err.Error(), "no .ecx index found on any local disk") { + t.Errorf("expected aggregated 'no .ecx' error, got: %v", err) + } +} + +// TestMountEcShards_ZeroByteEcxIsIgnored covers the user's "0-byte .ecx +// on one disk, valid .ecx on a sibling disk" case. writeToFile uses +// O_TRUNC and can leave an empty stub on a failed copy stream during EC +// distribute. The mount path must skip the stub and use the valid index +// from the sibling disk, not fail with "cannot open ec volume index". +func TestMountEcShards_ZeroByteEcxIsIgnored(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "disk0") + dir1 := filepath.Join(tempDir, "disk1") + store := startEcMountStore(t, []string{dir0, dir1}) + + const collection = "mybucket" + vid := needle.VolumeId(12) + const dataShards, parityShards = 10, 4 + const datSize int64 = 10 * 1024 * 1024 + expectedShardSize := calculateExpectedShardSize(datSize, dataShards) + const shardOnDisk0 erasure_coding.ShardId = 7 + + // shard on disk0; 0-byte .ecx stub on disk0; valid .ecx on disk1. + base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid)) + f, err := os.Create(base0 + erasure_coding.ToExt(int(shardOnDisk0))) + if err != nil { + t.Fatalf("create shard: %v", err) + } + if err := f.Truncate(expectedShardSize); err != nil { + f.Close() + t.Fatalf("truncate shard: %v", err) + } + f.Close() + if err := os.WriteFile(base0+".ecx", nil, 0o644); err != nil { + t.Fatalf("write 0-byte .ecx stub on disk0: %v", err) + } + + base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid)) + if err := os.WriteFile(base1+".ecx", make([]byte, 20), 0o644); err != nil { + t.Fatalf("write valid .ecx on disk1: %v", err) + } + if err := os.WriteFile(base1+".ecj", nil, 0o644); err != nil { + t.Fatalf("write .ecj: %v", err) + } + if err := volume_info.SaveVolumeInfo(base1+".vif", &volume_server_pb.VolumeInfo{ + Version: uint32(needle.Version3), + DatFileSize: datSize, + EcShardConfig: &volume_server_pb.EcShardConfig{ + DataShards: dataShards, + ParityShards: parityShards, + }, + }); err != nil { + t.Fatalf("save .vif: %v", err) + } + + if err := store.MountEcShards(collection, vid, shardOnDisk0, ""); err != nil { + t.Fatalf("MountEcShards should ignore the 0-byte .ecx stub and use disk1's valid index: %v", err) + } + + loc0 := store.Locations[0] + ev, found := loc0.FindEcVolume(vid) + if !found { + t.Fatalf("EC volume %d not found on disk0", vid) + } + if got, want := filepath.Dir(ev.FileName(".ecx")), dir1; got != want { + t.Errorf("EcVolume .ecx resolved at %q, want %q (sibling disk with valid index)", got, want) + } +} + +// TestMountEcShards_EmptyEcxMountsSuccessfully pins the empty-volume +// case: when an EC volume's .idx had no live entries at encode time, +// WriteSortedFileFromIdx legitimately produces a 0-byte .ecx. Mount +// must accept that as a valid empty index and let the EC volume come +// online — there is no reliable way to distinguish a legitimately +// empty .ecx from a stub left by a failed copy, and preventing the +// stub case is writeToFile's job at copy time, not NewEcVolume's at +// mount time. Pre-fix this returned "no .ecx index found on any local +// disk" because NewEcVolume mapped 0-byte to os.ErrNotExist. +func TestMountEcShards_EmptyEcxMountsSuccessfully(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "disk0") + dir1 := filepath.Join(tempDir, "disk1") + store := startEcMountStore(t, []string{dir0, dir1}) + + const collection = "mybucket" + vid := needle.VolumeId(13) + const dataShards, parityShards = 10, 4 + const datSize int64 = 0 + const shardOnDisk0 erasure_coding.ShardId = 0 + + // Plant a 0-byte shard, a 0-byte .ecx, an empty .ecj, and a .vif — + // the on-disk layout after encoding a volume that had no live needles. + base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid)) + if err := os.WriteFile(base0+erasure_coding.ToExt(int(shardOnDisk0)), nil, 0o644); err != nil { + t.Fatalf("write 0-byte shard: %v", err) + } + if err := os.WriteFile(base0+".ecx", nil, 0o644); err != nil { + t.Fatalf("write 0-byte .ecx (empty volume's legitimate empty index): %v", err) + } + if err := os.WriteFile(base0+".ecj", nil, 0o644); err != nil { + t.Fatalf("write .ecj: %v", err) + } + if err := volume_info.SaveVolumeInfo(base0+".vif", &volume_server_pb.VolumeInfo{ + Version: uint32(needle.Version3), + DatFileSize: datSize, + EcShardConfig: &volume_server_pb.EcShardConfig{ + DataShards: dataShards, + ParityShards: parityShards, + }, + }); err != nil { + t.Fatalf("save .vif: %v", err) + } + + if err := store.MountEcShards(collection, vid, shardOnDisk0, ""); err != nil { + t.Fatalf("MountEcShards should accept a 0-byte .ecx as a valid empty index: %v", err) + } + + loc0 := store.Locations[0] + if _, found := loc0.FindEcVolume(vid); !found { + t.Errorf("EC volume %d expected to be loaded on disk0 after mount", vid) + } +} + +// TestIndexEcxOwners_IgnoresZeroByteStub guards the reconcile owner +// selection: a 0-byte .ecx on the orphan disk must not win against a +// valid .ecx on a sibling disk, otherwise reconcileEcShardsAcrossDisks +// would point loaders at the stub and leave orphan shards unloaded. +// Same invariant as findEcxIdxDirForVolume, but for the bulk scan path. +func TestIndexEcxOwners_IgnoresZeroByteStub(t *testing.T) { + tempDir := t.TempDir() + dir0 := filepath.Join(tempDir, "disk0") // 0-byte .ecx stub + dir1 := filepath.Join(tempDir, "disk1") // valid .ecx + store := startEcMountStore(t, []string{dir0, dir1}) + + const collection = "mybucket" + vid := needle.VolumeId(99) + + base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid)) + if err := os.WriteFile(base0+".ecx", nil, 0o644); err != nil { + t.Fatalf("write 0-byte .ecx stub on disk0: %v", err) + } + base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid)) + if err := os.WriteFile(base1+".ecx", make([]byte, 20), 0o644); err != nil { + t.Fatalf("write valid .ecx on disk1: %v", err) + } + + owners := store.indexEcxOwners() + owner, ok := owners[ecKeyForReconcile{collection: collection, vid: vid}] + if !ok { + t.Fatalf("indexEcxOwners did not find any owner; expected disk1 with the valid .ecx") + } + if owner.location != store.Locations[1] { + t.Errorf("indexEcxOwners chose disk %s as owner; want disk1 (which holds the valid .ecx, not the 0-byte stub)", owner.location.Directory) + } +} diff --git a/weed/storage/store_ec_reconcile.go b/weed/storage/store_ec_reconcile.go index 0ca3e1f91..391a4d451 100644 --- a/weed/storage/store_ec_reconcile.go +++ b/weed/storage/store_ec_reconcile.go @@ -127,7 +127,11 @@ func (s *Store) findEcxIdxDirForVolume(collection string, vid needle.VolumeId) ( } seen[scan] = true base := erasure_coding.EcShardFileName(collection, scan, int(vid)) - if info, err := os.Stat(base + ".ecx"); err == nil && !info.IsDir() { + // A 0-byte .ecx is not a usable index — EC distribute's writeToFile + // opens with O_TRUNC and can leave a stub on a mid-stream failure. + // Treat it the same as absent so the scan continues to a sibling + // disk that may hold a valid index. + if info, err := os.Stat(base + ".ecx"); err == nil && !info.IsDir() && info.Size() > 0 { return scan, true } } @@ -163,6 +167,17 @@ func (s *Store) indexEcxOwners() map[ecKeyForReconcile]ecxOwnerInfo { if !strings.HasSuffix(name, ".ecx") { continue } + // A 0-byte .ecx is a corrupt stub from a failed copy and + // not a credible owner — skip it so the scan keeps looking + // for a real index on a sibling disk. Without this, an + // orphan-shard reconcile could pick the stub as owner and + // point NewEcVolume at it, which now fails by design + // (NewEcVolume rejects 0-byte .ecx), leaving the orphan + // shards unloaded even when a valid index exists nearby. + info, statErr := entry.Info() + if statErr != nil || info.Size() == 0 { + continue + } base := name[:len(name)-len(".ecx")] collection, vid, err := parseCollectionVolumeId(base) if err != nil {