fix(ec_mount): reject 0-byte .ecx and aggregate cross-disk failures (#9542)

* fix(ec_mount): reject 0-byte .ecx and aggregate cross-disk failures

MountEcShards's per-disk loop bailed on the first disk returning a
non-ENOENT error, and NewEcVolume wrapped its ENOENT with %v so the
caller's `err == os.ErrNotExist` check never matched. On a multi-disk
volume server where ec.balance / ec.rebuild had distributed shards
across sibling disks while the matching .ecx never arrived, the mount
loop bailed after disk 0 with "cannot open ec volume index" and the
operator never saw that the rest of the disks were also empty. The
companion failure mode is a 0-byte .ecx stub left by EC distribute's
writeToFile after a mid-stream copy failure: Stat() succeeds, treating
the stub as a valid index, and downstream mount work proceeds against
an empty file.

Wrap the ec-volume open errors with %w, treat a 0-byte .ecx as
os.ErrNotExist (in NewEcVolume, findEcxIdxDirForVolume, and
HasEcxFileOnDisk), and have MountEcShards collect per-disk failures
before returning a single aggregated error. The "no .ecx anywhere"
case gets a distinct error so the orchestrator can re-copy the index
from a healthy replica rather than retry against the same broken
state.

* fix(ec_reconcile): indexEcxOwners also rejects 0-byte .ecx stubs

findEcxIdxDirForVolume already skipped 0-byte .ecx during MountEcShards,
but indexEcxOwners (used by reconcileEcShardsAcrossDisks at startup)
still recorded the first .ecx by name only. On a store where one disk
holds a 0-byte stub left by a failed EC distribute and a sibling disk
holds the real index, the stub would win the owner selection — and
NewEcVolume's new size check would then refuse to load against it,
leaving the orphan shards unloaded even though a valid index exists.

Mirror the size check from findEcxIdxDirForVolume: skip directory
entries whose .ecx Info() reports size 0 or whose Info() call fails.

* fix(ec_mount): accept 0-byte .ecx as valid empty index

The previous commit treated a 0-byte .ecx in NewEcVolume as
os.ErrNotExist, on the assumption that any empty .ecx was a stub left
by a failed copy stream. That broke the legitimate empty-volume case:
when an EC volume's source .idx has no live entries (e.g. all needles
deleted before WriteSortedFileFromIdx), the sorted .ecx is genuinely
0 bytes and must mount. The integration test
TestEcShardsToVolumeMissingShardAndNoLiveEntries fails with
"MountEcShards: no .ecx index found on any local disk" because the
mount path now refuses the legitimate empty index.

A 0-byte .ecx left by a failed copy stream is indistinguishable from
the legitimate empty case by file size alone. Preventing stub files
from being written is the receiver-side cleanup in writeToFile's job
(the companion EC distribute PR), not NewEcVolume's at mount time.

The cross-disk lookup helpers (findEcxIdxDirForVolume, HasEcxFileOnDisk,
indexEcxOwners) keep their size > 0 preference: when a real .ecx
exists on a sibling disk alongside a stub, we still want to route
mounts and reconcile at the real one. If no non-zero .ecx exists
anywhere, the per-disk fallback in MountEcShards can still open the
0-byte .ecx and the volume mounts.

Replace TestMountEcShards_ZeroByteEcxOnlyDisk with
TestMountEcShards_EmptyEcxMountsSuccessfully, which pins the
empty-volume invariant.
This commit is contained in:
Chris Lu
2026-05-18 15:00:33 -07:00
committed by GitHub
parent 41b6ad002b
commit af8d4e00ee
5 changed files with 325 additions and 20 deletions

View File

@@ -102,12 +102,16 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S
// the orphan-shard layout reported in #9212.
func (l *DiskLocation) HasEcxFileOnDisk(collection string, vid needle.VolumeId) bool {
idxBase := erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid))
if info, err := os.Stat(idxBase + ".ecx"); err == nil && !info.IsDir() {
// A 0-byte .ecx is a corrupt stub left by a failed EC distribute copy;
// it cannot drive mount and must not steer placement decisions toward
// this disk. Treat it as absent so the caller falls through to a
// sibling disk that may hold a valid index.
if info, err := os.Stat(idxBase + ".ecx"); err == nil && !info.IsDir() && info.Size() > 0 {
return true
}
if l.IdxDirectory != l.Directory {
dataBase := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
if info, err := os.Stat(dataBase + ".ecx"); err == nil && !info.IsDir() {
if info, err := os.Stat(dataBase + ".ecx"); err == nil && !info.IsDir() && info.Size() > 0 {
return true
}
}
@@ -130,7 +134,7 @@ func (l *DiskLocation) loadEcShardWithIdxDir(collection string, vid needle.Volum
if err == os.ErrNotExist {
return nil, os.ErrNotExist
}
return nil, fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
return nil, fmt.Errorf("failed to create ec shard %d.%d: %w", vid, shardId, err)
}
l.ecVolumesLock.Lock()
defer l.ecVolumesLock.Unlock()
@@ -138,7 +142,10 @@ func (l *DiskLocation) loadEcShardWithIdxDir(collection string, vid needle.Volum
if !found {
ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, idxDir, collection, vid)
if err != nil {
return nil, fmt.Errorf("failed to create ec volume %d: %v", vid, err)
// Wrap with %w so MountEcShards / startup reconcile can use
// errors.Is(err, os.ErrNotExist) to decide whether to try the
// next local disk vs. surface the failure.
return nil, fmt.Errorf("failed to create ec volume %d: %w", vid, err)
}
l.ecVolumes[vid] = ecVolume
}

View File

@@ -67,7 +67,15 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
dataBaseFileName := EcShardFileName(collection, dir, int(vid))
indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
// open ecx file
// open ecx file. Wrap errors with %w so callers walking up the stack
// (notably Store.MountEcShards) can use errors.Is(err, os.ErrNotExist)
// to decide whether to try the next local disk vs. bail. A 0-byte .ecx
// is a legitimate index for a volume that had no live needles at encode
// time (e.g. all needles deleted before WriteSortedFileFromIdx) and
// must mount successfully here. A 0-byte stub left by a failed copy
// stream is indistinguishable from that empty case by file size alone;
// preventing such stubs is the receiver-side cleanup in writeToFile's
// job, not this open path.
ev.ecxActualDir = dirIdx
if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
if dirIdx != dir && os.IsNotExist(err) {
@@ -75,18 +83,23 @@ func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection
firstErr := err
glog.V(1).Infof("ecx file not found at %s.ecx, falling back to %s.ecx", indexBaseFileName, dataBaseFileName)
if ev.ecxFile, err = os.OpenFile(dataBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
return nil, fmt.Errorf("open ecx index %s.ecx: %v; fallback %s.ecx: %v", indexBaseFileName, firstErr, dataBaseFileName, err)
if os.IsNotExist(err) {
return nil, fmt.Errorf("open ecx index %s.ecx (fallback %s.ecx): %w", indexBaseFileName, dataBaseFileName, os.ErrNotExist)
}
return nil, fmt.Errorf("open ecx index %s.ecx: %v; fallback %s.ecx: %w", indexBaseFileName, firstErr, dataBaseFileName, err)
}
indexBaseFileName = dataBaseFileName
ev.ecxActualDir = dir
} else if os.IsNotExist(err) {
return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %w", indexBaseFileName, os.ErrNotExist)
} else {
return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %w", indexBaseFileName, err)
}
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
_ = ev.ecxFile.Close()
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %w", indexBaseFileName, statErr)
}
ev.ecxFileSize = ecxFi.Size()
ev.ecxCreatedAt = ecxFi.ModTime()

View File

@@ -164,13 +164,25 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
// holding the .ec?? shard being mounted: ec.balance / ec.rebuild can
// place the .ecx on one local disk while later distributing shards
// across sibling disks of the same volume server. The per-disk
// IdxDirectory used by LoadEcShard would ENOENT the .ecx and fail
// with "cannot open ec volume index" instead of returning
// ErrNotExist, so the loop below would bail before trying other
// disks. Look up the .ecx owner across all DiskLocations once and
// route NewEcVolume at the directory that actually has the file.
// IdxDirectory used by LoadEcShard would ENOENT the .ecx, so look up
// the .ecx owner across all DiskLocations once and route NewEcVolume
// at the directory that actually has the file. A 0-byte .ecx is
// treated as missing here (writeToFile can leave a stub on a failed
// EC distribute) so we still scan the rest of the disks.
ecxIdxDir, ecxFound := s.findEcxIdxDirForVolume(collection, vid)
// Collect failures so an all-disks-fail return reports every disk we
// tried rather than just the first one. Before this loop reordered
// itself to keep going after the first non-ENOENT error, a single
// shard-on-disk-without-.ecx situation would bail the loop and the
// operator saw "cannot open ec volume index" naming exactly one disk
// even when others held a valid index.
type diskError struct {
dir string
err error
}
var failures []diskError
for diskId, location := range s.Locations {
idxDir := location.IdxDirectory
if ecxFound {
@@ -185,7 +197,8 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
}
}
}
if ecVolume, err := location.loadEcShardWithIdxDir(collection, vid, shardId, idxDir); err == nil {
ecVolume, err := location.loadEcShardWithIdxDir(collection, vid, shardId, idxDir)
if err == nil {
glog.V(0).Infof("MountEcShards %d.%d on disk ID %d", vid, shardId, diskId)
// Apply the orchestrator-supplied source disk type so the EC
@@ -207,14 +220,35 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
DiskId: uint32(diskId),
}
return nil
} else if err == os.ErrNotExist {
continue
} else {
return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
}
if errors.Is(err, os.ErrNotExist) {
// Shard or index not on this disk; another disk may own it.
continue
}
failures = append(failures, diskError{dir: location.Directory, err: err})
}
return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
if len(failures) == 0 {
// No disk had the shard or the index; this volume server is not
// holding any artefacts for the requested shard. Name what we
// scanned for so the operator can tell "no .ecx anywhere" apart
// from "shard not on this server".
if !ecxFound {
return fmt.Errorf("MountEcShards %d.%d: no .ecx index found on any local disk", vid, shardId)
}
return fmt.Errorf("MountEcShards %d.%d not found on disk", vid, shardId)
}
// Some disks returned a real (non-ENOENT) error. Report them all so
// the caller can see whether the failures cluster around one disk
// (likely hardware) or are spread out (likely a config problem).
var b []byte
for i, f := range failures {
if i > 0 {
b = append(b, "; "...)
}
b = append(b, fmt.Sprintf("%s: %v", f.dir, f.err)...)
}
return fmt.Errorf("MountEcShards %d.%d load failures: %s", vid, shardId, string(b))
}
func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.ShardId) error {

View File

@@ -236,3 +236,239 @@ func TestMountEcShards_SameDiskEcxStillWorks(t *testing.T) {
t.Errorf("EcVolume .ecx resolved at %q, want directory %q (same disk as shard)", got, want)
}
}
// startEcMountStore plants two empty disk directories and returns a Store
// configured over them, draining the announcement channels in the
// background so MountEcShards does not block. Shared setup for the
// missing-.ecx and 0-byte-.ecx tests below.
func startEcMountStore(t *testing.T, dirs []string) *Store {
t.Helper()
for _, d := range dirs {
if err := os.MkdirAll(d, 0o755); err != nil {
t.Fatalf("mkdir %s: %v", d, err)
}
}
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
dirs,
[]int32{100, 100},
[]util.MinFreeSpace{{}, {}},
"",
NeedleMapInMemory,
[]types.DiskType{types.HardDriveType, types.HardDriveType},
nil,
3,
)
done := make(chan struct{})
go func() {
for {
select {
case <-store.NewEcShardsChan:
case <-store.NewVolumesChan:
case <-store.DeletedVolumesChan:
case <-store.DeletedEcShardsChan:
case <-store.StateUpdateChan:
case <-done:
return
}
}
}()
t.Cleanup(func() {
store.Close()
close(done)
})
return store
}
// TestMountEcShards_MissingEcxOnAllDisks covers the case where the EC
// distribute step has dropped shards on the volume server but no .ecx
// ever arrived (the operator-reported "Node 1 has 17 shard files but
// zero .ecx" pattern). MountEcShards used to bail on the first disk's
// "cannot open ec volume index" error — which is not os.ErrNotExist
// because NewEcVolume wrapped it with %v — and the operator saw a
// confusing per-disk error. The mount path now keeps scanning every
// local disk and, when none own an index, returns one clear error.
func TestMountEcShards_MissingEcxOnAllDisks(t *testing.T) {
tempDir := t.TempDir()
dir0 := filepath.Join(tempDir, "disk0")
dir1 := filepath.Join(tempDir, "disk1")
store := startEcMountStore(t, []string{dir0, dir1})
const collection = "mybucket"
vid := needle.VolumeId(11)
const dataShards = 10
const datSize int64 = 10 * 1024 * 1024
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
const shardOnDisk0 erasure_coding.ShardId = 6
// Plant a shard file on disk0 but no .ecx anywhere.
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
f, err := os.Create(base0 + erasure_coding.ToExt(int(shardOnDisk0)))
if err != nil {
t.Fatalf("create shard: %v", err)
}
if err := f.Truncate(expectedShardSize); err != nil {
f.Close()
t.Fatalf("truncate shard: %v", err)
}
f.Close()
err = store.MountEcShards(collection, vid, shardOnDisk0, "")
if err == nil {
t.Fatalf("MountEcShards should fail when no .ecx exists on any disk")
}
if !strings.Contains(err.Error(), "no .ecx index found on any local disk") {
t.Errorf("expected aggregated 'no .ecx' error, got: %v", err)
}
}
// TestMountEcShards_ZeroByteEcxIsIgnored covers the user's "0-byte .ecx
// on one disk, valid .ecx on a sibling disk" case. writeToFile uses
// O_TRUNC and can leave an empty stub on a failed copy stream during EC
// distribute. The mount path must skip the stub and use the valid index
// from the sibling disk, not fail with "cannot open ec volume index".
func TestMountEcShards_ZeroByteEcxIsIgnored(t *testing.T) {
tempDir := t.TempDir()
dir0 := filepath.Join(tempDir, "disk0")
dir1 := filepath.Join(tempDir, "disk1")
store := startEcMountStore(t, []string{dir0, dir1})
const collection = "mybucket"
vid := needle.VolumeId(12)
const dataShards, parityShards = 10, 4
const datSize int64 = 10 * 1024 * 1024
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
const shardOnDisk0 erasure_coding.ShardId = 7
// shard on disk0; 0-byte .ecx stub on disk0; valid .ecx on disk1.
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
f, err := os.Create(base0 + erasure_coding.ToExt(int(shardOnDisk0)))
if err != nil {
t.Fatalf("create shard: %v", err)
}
if err := f.Truncate(expectedShardSize); err != nil {
f.Close()
t.Fatalf("truncate shard: %v", err)
}
f.Close()
if err := os.WriteFile(base0+".ecx", nil, 0o644); err != nil {
t.Fatalf("write 0-byte .ecx stub on disk0: %v", err)
}
base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid))
if err := os.WriteFile(base1+".ecx", make([]byte, 20), 0o644); err != nil {
t.Fatalf("write valid .ecx on disk1: %v", err)
}
if err := os.WriteFile(base1+".ecj", nil, 0o644); err != nil {
t.Fatalf("write .ecj: %v", err)
}
if err := volume_info.SaveVolumeInfo(base1+".vif", &volume_server_pb.VolumeInfo{
Version: uint32(needle.Version3),
DatFileSize: datSize,
EcShardConfig: &volume_server_pb.EcShardConfig{
DataShards: dataShards,
ParityShards: parityShards,
},
}); err != nil {
t.Fatalf("save .vif: %v", err)
}
if err := store.MountEcShards(collection, vid, shardOnDisk0, ""); err != nil {
t.Fatalf("MountEcShards should ignore the 0-byte .ecx stub and use disk1's valid index: %v", err)
}
loc0 := store.Locations[0]
ev, found := loc0.FindEcVolume(vid)
if !found {
t.Fatalf("EC volume %d not found on disk0", vid)
}
if got, want := filepath.Dir(ev.FileName(".ecx")), dir1; got != want {
t.Errorf("EcVolume .ecx resolved at %q, want %q (sibling disk with valid index)", got, want)
}
}
// TestMountEcShards_EmptyEcxMountsSuccessfully pins the empty-volume
// case: when an EC volume's .idx had no live entries at encode time,
// WriteSortedFileFromIdx legitimately produces a 0-byte .ecx. Mount
// must accept that as a valid empty index and let the EC volume come
// online — there is no reliable way to distinguish a legitimately
// empty .ecx from a stub left by a failed copy, and preventing the
// stub case is writeToFile's job at copy time, not NewEcVolume's at
// mount time. Pre-fix this returned "no .ecx index found on any local
// disk" because NewEcVolume mapped 0-byte to os.ErrNotExist.
func TestMountEcShards_EmptyEcxMountsSuccessfully(t *testing.T) {
tempDir := t.TempDir()
dir0 := filepath.Join(tempDir, "disk0")
dir1 := filepath.Join(tempDir, "disk1")
store := startEcMountStore(t, []string{dir0, dir1})
const collection = "mybucket"
vid := needle.VolumeId(13)
const dataShards, parityShards = 10, 4
const datSize int64 = 0
const shardOnDisk0 erasure_coding.ShardId = 0
// Plant a 0-byte shard, a 0-byte .ecx, an empty .ecj, and a .vif —
// the on-disk layout after encoding a volume that had no live needles.
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
if err := os.WriteFile(base0+erasure_coding.ToExt(int(shardOnDisk0)), nil, 0o644); err != nil {
t.Fatalf("write 0-byte shard: %v", err)
}
if err := os.WriteFile(base0+".ecx", nil, 0o644); err != nil {
t.Fatalf("write 0-byte .ecx (empty volume's legitimate empty index): %v", err)
}
if err := os.WriteFile(base0+".ecj", nil, 0o644); err != nil {
t.Fatalf("write .ecj: %v", err)
}
if err := volume_info.SaveVolumeInfo(base0+".vif", &volume_server_pb.VolumeInfo{
Version: uint32(needle.Version3),
DatFileSize: datSize,
EcShardConfig: &volume_server_pb.EcShardConfig{
DataShards: dataShards,
ParityShards: parityShards,
},
}); err != nil {
t.Fatalf("save .vif: %v", err)
}
if err := store.MountEcShards(collection, vid, shardOnDisk0, ""); err != nil {
t.Fatalf("MountEcShards should accept a 0-byte .ecx as a valid empty index: %v", err)
}
loc0 := store.Locations[0]
if _, found := loc0.FindEcVolume(vid); !found {
t.Errorf("EC volume %d expected to be loaded on disk0 after mount", vid)
}
}
// TestIndexEcxOwners_IgnoresZeroByteStub guards the reconcile owner
// selection: a 0-byte .ecx on the orphan disk must not win against a
// valid .ecx on a sibling disk, otherwise reconcileEcShardsAcrossDisks
// would point loaders at the stub and leave orphan shards unloaded.
// Same invariant as findEcxIdxDirForVolume, but for the bulk scan path.
func TestIndexEcxOwners_IgnoresZeroByteStub(t *testing.T) {
tempDir := t.TempDir()
dir0 := filepath.Join(tempDir, "disk0") // 0-byte .ecx stub
dir1 := filepath.Join(tempDir, "disk1") // valid .ecx
store := startEcMountStore(t, []string{dir0, dir1})
const collection = "mybucket"
vid := needle.VolumeId(99)
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
if err := os.WriteFile(base0+".ecx", nil, 0o644); err != nil {
t.Fatalf("write 0-byte .ecx stub on disk0: %v", err)
}
base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid))
if err := os.WriteFile(base1+".ecx", make([]byte, 20), 0o644); err != nil {
t.Fatalf("write valid .ecx on disk1: %v", err)
}
owners := store.indexEcxOwners()
owner, ok := owners[ecKeyForReconcile{collection: collection, vid: vid}]
if !ok {
t.Fatalf("indexEcxOwners did not find any owner; expected disk1 with the valid .ecx")
}
if owner.location != store.Locations[1] {
t.Errorf("indexEcxOwners chose disk %s as owner; want disk1 (which holds the valid .ecx, not the 0-byte stub)", owner.location.Directory)
}
}

View File

@@ -127,7 +127,11 @@ func (s *Store) findEcxIdxDirForVolume(collection string, vid needle.VolumeId) (
}
seen[scan] = true
base := erasure_coding.EcShardFileName(collection, scan, int(vid))
if info, err := os.Stat(base + ".ecx"); err == nil && !info.IsDir() {
// A 0-byte .ecx is not a usable index — EC distribute's writeToFile
// opens with O_TRUNC and can leave a stub on a mid-stream failure.
// Treat it the same as absent so the scan continues to a sibling
// disk that may hold a valid index.
if info, err := os.Stat(base + ".ecx"); err == nil && !info.IsDir() && info.Size() > 0 {
return scan, true
}
}
@@ -163,6 +167,17 @@ func (s *Store) indexEcxOwners() map[ecKeyForReconcile]ecxOwnerInfo {
if !strings.HasSuffix(name, ".ecx") {
continue
}
// A 0-byte .ecx is a corrupt stub from a failed copy and
// not a credible owner — skip it so the scan keeps looking
// for a real index on a sibling disk. Without this, an
// orphan-shard reconcile could pick the stub as owner and
// point NewEcVolume at it, which now fails by design
// (NewEcVolume rejects 0-byte .ecx), leaving the orphan
// shards unloaded even when a valid index exists nearby.
info, statErr := entry.Info()
if statErr != nil || info.Size() == 0 {
continue
}
base := name[:len(name)-len(".ecx")]
collection, vid, err := parseCollectionVolumeId(base)
if err != nil {