mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-19 16:21:28 +00:00
fix(ec): mirror EC sidecars onto every shard-bearing disk at startup
In a multi-disk volume server, ec.balance and ec.rebuild can land shards on a disk that does not also hold the matching .ecx / .ecj / .vif index files. The orphan-shard reconciler in reconcileEcShardsAcrossDisks already loads those shards by pointing the EcVolume at the sibling disk's index files; reads work, but any failure on the index-owning disk silently disables every shard on the other disk, even though those shards are physically fine. This change adds mirrorEcMetadataToShardDisks, a startup pass that physically replicates .ecx / .ecj / .vif onto each disk that holds shards but is missing them. Each copy is atomic (tmp + fsync + rename) and idempotent (a destination that already has the sidecar is preserved). After mirroring, the cross-disk reconciler prefers the local IdxDirectory so the EcVolume mounts self-contained; the cross-disk virtual mount remains as a fallback for volumes whose mirror failed (read-only target, out of space, partial copy on a previous boot). The same-disk invariant the EC lifecycle (encode / decode / balance / vacuum / repair) was already documented as promising is now actually restored at boot, so a future failure of one disk in a split-shards layout no longer takes the other disk's shards with it. Tests cover the orphan-layout mirror (dir0 receives the .ecx / .ecj / .vif from dir1) and idempotency (an existing destination .ecx is not overwritten with the owner's copy).
This commit is contained in:
@@ -164,13 +164,26 @@ func NewStore(
|
||||
// we just cleaned up.
|
||||
s.pruneIncompleteEcWithSiblingDat()
|
||||
|
||||
// Before the cross-disk virtual-mount fallback runs, physically copy
|
||||
// EC sidecars (.ecx / .ecj / .vif) onto every disk that holds shards
|
||||
// but is missing them. The EC lifecycle (encode / decode / balance /
|
||||
// vacuum / repair) promises a same-disk layout: every shard lives
|
||||
// alongside its own metadata. Mirroring at boot restores that
|
||||
// invariant after ec.balance or ec.rebuild has split shards from
|
||||
// their index across disks of the same volume server, so each disk
|
||||
// can mount self-contained instead of reaching across to a sibling.
|
||||
s.mirrorEcMetadataToShardDisks()
|
||||
|
||||
// After every DiskLocation has finished its per-disk EC scan, sweep the
|
||||
// store for shards that live on a disk without local index files and
|
||||
// load them by reaching across to a sibling disk's .ecx / .ecj / .vif.
|
||||
// This is the volume-server side of issue #9212: ec.balance can move
|
||||
// shards onto a destination node's second disk while leaving the index
|
||||
// on the disk that already held the volume, and without this pass those
|
||||
// orphan shards stay invisible to the master.
|
||||
// orphan shards stay invisible to the master. Even after the mirror
|
||||
// pass above, this stays as the fallback for volumes whose mirror
|
||||
// failed (read-only target disk, partial copy, etc.) so the cluster
|
||||
// stays available.
|
||||
s.reconcileEcShardsAcrossDisks()
|
||||
|
||||
// Resolve state.pb's directory via the first disk location so it inherits
|
||||
|
||||
246
weed/storage/store_ec_mirror.go
Normal file
246
weed/storage/store_ec_mirror.go
Normal file
@@ -0,0 +1,246 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
)
|
||||
|
||||
// ecMirroredSidecars is the set of EC metadata files that must travel
|
||||
// with the shards. Each disk that holds .ec?? files for a
|
||||
// (collection, vid) on this volume server is expected to also hold a
|
||||
// local copy of these files so the EcVolume can mount self-contained —
|
||||
// without reaching across to a sibling disk for its index, journal, or
|
||||
// volume info.
|
||||
//
|
||||
// `.ecx` is the sorted needle index produced at encode time.
|
||||
// `.ecj` is the deletion journal appended on every blob delete.
|
||||
// `.vif` is the VolumeInfo (version, datFileSize, EC ratio, ttl).
|
||||
//
|
||||
// Listed in the order NewEcVolume opens them at mount, so a partial
|
||||
// mirror can be diagnosed by walking the list and stopping at the
|
||||
// first destination that does not yet have its copy.
|
||||
var ecMirroredSidecars = []string{".ecx", ".ecj", ".vif"}
|
||||
|
||||
// mirrorEcMetadataToShardDisks copies the EC sidecar files (.ecx,
|
||||
// .ecj, .vif) onto every disk of this store that holds EC shards but
|
||||
// is missing the matching sidecars. The goal is the same-disk
|
||||
// invariant promised by the EC lifecycle: encode / decode / balance /
|
||||
// vacuum / repair must leave every shard alongside its own metadata,
|
||||
// so the EcVolume can mount without reaching across to a sibling
|
||||
// disk.
|
||||
//
|
||||
// Without this pass, a disk that received shards through ec.balance
|
||||
// or ec.rebuild — where only the first shard carries
|
||||
// CopyEcxFile=true and subsequent shards land via auto-select — can
|
||||
// end up with the .ec?? files but no local .ecx / .ecj / .vif. The
|
||||
// orphan-shard reconciler in reconcileEcShardsAcrossDisks then mounts
|
||||
// the shards by pointing the EcVolume at the sibling disk's index
|
||||
// files. Reads work, but any failure on the index-owning disk (a
|
||||
// removed drive, a corrupted ext4, an operator who unmounted the
|
||||
// wrong sled) takes the shards with it.
|
||||
//
|
||||
// Mirroring physically replicates the sidecars onto each
|
||||
// shard-bearing disk so the shards stay readable as long as their own
|
||||
// disk is up. Idempotent: a destination that already has its sidecar
|
||||
// is skipped, and a missing source sidecar is not an error.
|
||||
//
|
||||
// Runs before reconcileEcShardsAcrossDisks so the orphan-shard pass
|
||||
// sees the freshly-mirrored layout and can mount each disk against
|
||||
// its own IdxDirectory rather than falling back to the cross-disk
|
||||
// path. The cross-disk fallback is preserved for cases where
|
||||
// mirroring fails (out of disk space, read-only target, etc.) so the
|
||||
// volume stays available.
|
||||
func (s *Store) mirrorEcMetadataToShardDisks() {
|
||||
if len(s.Locations) < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
ecxOwners := s.indexEcxOwners()
|
||||
if len(ecxOwners) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for _, loc := range s.Locations {
|
||||
orphans := loc.collectOrphanEcShards()
|
||||
if len(orphans) == 0 {
|
||||
continue
|
||||
}
|
||||
for key := range orphans {
|
||||
owner, ok := ecxOwners[key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// Same-disk owner: the per-disk loader will have already
|
||||
// surfaced any failure. Mirroring to the same disk is a
|
||||
// no-op anyway (source == destination).
|
||||
if owner.location == loc {
|
||||
continue
|
||||
}
|
||||
// Skip if this destination already has every sidecar — common
|
||||
// when mirroring ran successfully on a previous startup and
|
||||
// the operator hasn't disturbed the layout.
|
||||
if loc.hasAllEcSidecarsLocally(key.collection, key.vid) {
|
||||
continue
|
||||
}
|
||||
copied, err := loc.mirrorEcSidecarsFrom(owner, key.collection, key.vid)
|
||||
if err != nil {
|
||||
// Don't abort the whole pass on a single failure: the
|
||||
// cross-disk reconciler downstream still gives this
|
||||
// volume a working (if non-mirrored) mount.
|
||||
glog.Warningf("ec volume %d (collection=%q): mirror sidecars from %s to %s failed after %d files: %v; cross-disk fallback will handle this volume",
|
||||
key.vid, key.collection, owner.location.Directory, loc.Directory, copied, err)
|
||||
continue
|
||||
}
|
||||
if copied > 0 {
|
||||
glog.V(0).Infof("ec volume %d (collection=%q): mirrored %d sidecar(s) from %s to %s for same-disk invariant",
|
||||
key.vid, key.collection, copied, owner.location.Directory, loc.Directory)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// hasAllEcSidecarsLocally reports whether this disk already has every
|
||||
// EC sidecar that ecMirroredSidecars expects, in the directory where
|
||||
// NewEcVolume looks for it (IdxDirectory for .ecx/.ecj, Directory for
|
||||
// .vif).
|
||||
func (l *DiskLocation) hasAllEcSidecarsLocally(collection string, vid needle.VolumeId) bool {
|
||||
for _, ext := range ecMirroredSidecars {
|
||||
dst := l.ecSidecarDestPath(collection, vid, ext)
|
||||
if _, err := os.Stat(dst); err != nil {
|
||||
// Either NotExist or another error — pessimistically treat
|
||||
// as missing so the caller attempts a fresh mirror.
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// ecSidecarDestPath returns where on this disk a given EC sidecar
|
||||
// extension is expected to live. Keeps the routing logic close to
|
||||
// NewEcVolume's lookup order:
|
||||
//
|
||||
// .ecx / .ecj → IdxDirectory (sorted index, deletion journal)
|
||||
// .vif → Directory (volume info next to .dat / shards)
|
||||
func (l *DiskLocation) ecSidecarDestPath(collection string, vid needle.VolumeId, ext string) string {
|
||||
if ext == ".vif" {
|
||||
return erasure_coding.EcShardFileName(collection, l.Directory, int(vid)) + ext
|
||||
}
|
||||
return erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid)) + ext
|
||||
}
|
||||
|
||||
// mirrorEcSidecarsFrom copies every sidecar in ecMirroredSidecars
|
||||
// from the owner disk to this disk. Returns the count of files
|
||||
// successfully copied — a non-zero count with a non-nil error
|
||||
// indicates a partial mirror, which the caller logs but otherwise
|
||||
// leaves to the cross-disk fallback.
|
||||
//
|
||||
// Source resolution per file: check the owner's recorded idxDir first
|
||||
// (matches indexEcxOwners' "where I actually found .ecx" record),
|
||||
// then fall back to the owner's data Directory. This matches
|
||||
// removeEcVolumeFiles' two-directory cleanup contract — we read from
|
||||
// wherever the owner actually put the file.
|
||||
//
|
||||
// Each copy is atomic: write to <dst>.tmp, fsync, rename. A
|
||||
// pre-existing destination is left untouched (no overwrite), so a
|
||||
// half-finished mirror from a previous startup is healed file-by-file
|
||||
// without rewriting bytes that are already correct.
|
||||
func (l *DiskLocation) mirrorEcSidecarsFrom(owner ecxOwnerInfo, collection string, vid needle.VolumeId) (int, error) {
|
||||
srcIdxBase := erasure_coding.EcShardFileName(collection, owner.idxDir, int(vid))
|
||||
srcDataBase := erasure_coding.EcShardFileName(collection, owner.location.Directory, int(vid))
|
||||
|
||||
copied := 0
|
||||
for _, ext := range ecMirroredSidecars {
|
||||
dst := l.ecSidecarDestPath(collection, vid, ext)
|
||||
// Don't overwrite a sidecar that's already present: an
|
||||
// existing local copy is authoritative (it may be newer than
|
||||
// the owner's after a delete journal append).
|
||||
if _, err := os.Stat(dst); err == nil {
|
||||
continue
|
||||
} else if !os.IsNotExist(err) {
|
||||
return copied, fmt.Errorf("stat %s: %w", dst, err)
|
||||
}
|
||||
|
||||
// Pick the source: idxDir first (where the owner reported
|
||||
// finding .ecx), then the owner's data dir as a fallback for
|
||||
// .vif (and for legacy layouts that left .ecx in Directory).
|
||||
var src string
|
||||
for _, candidate := range []string{srcIdxBase + ext, srcDataBase + ext} {
|
||||
if info, err := os.Stat(candidate); err == nil && !info.IsDir() {
|
||||
src = candidate
|
||||
break
|
||||
}
|
||||
}
|
||||
if src == "" {
|
||||
glog.V(1).Infof("ec volume %d (collection=%q): sidecar %s not found on owner %s; skipping mirror",
|
||||
vid, collection, ext, owner.location.Directory)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := copyEcSidecarAtomic(src, dst); err != nil {
|
||||
return copied, fmt.Errorf("copy %s: %w", ext, err)
|
||||
}
|
||||
copied++
|
||||
}
|
||||
return copied, nil
|
||||
}
|
||||
|
||||
// copyEcSidecarAtomic copies src to dst via a sibling .tmp file and a
|
||||
// rename, fsyncing the data before the rename. Crash-safe: a partial
|
||||
// write leaves the .tmp orphaned (cleaned up on the next mirror pass
|
||||
// via the os.Remove on failure) and the canonical dst stays absent so
|
||||
// a retry recognises it still needs to copy.
|
||||
//
|
||||
// Caller has already verified dst does not exist; we still pass
|
||||
// O_EXCL on the tmp create so a concurrent mirror can't both fight
|
||||
// for the same temp path.
|
||||
func copyEcSidecarAtomic(src, dst string) error {
|
||||
srcFile, err := os.Open(src)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open source %s: %w", src, err)
|
||||
}
|
||||
defer srcFile.Close()
|
||||
|
||||
// Make sure the destination directory exists. Locations created
|
||||
// before -dir.idx was set may not have the idx tree yet on a
|
||||
// fresh disk; MkdirAll is a no-op when it already does.
|
||||
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
|
||||
return fmt.Errorf("mkdir %s: %w", filepath.Dir(dst), err)
|
||||
}
|
||||
|
||||
tmpDst := dst + ".mirror.tmp"
|
||||
// O_EXCL guards against a stale tmp from a previous interrupted
|
||||
// run; if it's there, blow it away and retry.
|
||||
_ = os.Remove(tmpDst)
|
||||
dstFile, err := os.OpenFile(tmpDst, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create %s: %w", tmpDst, err)
|
||||
}
|
||||
cleanup := func() {
|
||||
_ = dstFile.Close()
|
||||
_ = os.Remove(tmpDst)
|
||||
}
|
||||
|
||||
if _, err := io.Copy(dstFile, srcFile); err != nil {
|
||||
cleanup()
|
||||
return fmt.Errorf("copy bytes %s -> %s: %w", src, tmpDst, err)
|
||||
}
|
||||
if err := dstFile.Sync(); err != nil {
|
||||
cleanup()
|
||||
return fmt.Errorf("fsync %s: %w", tmpDst, err)
|
||||
}
|
||||
if err := dstFile.Close(); err != nil {
|
||||
_ = os.Remove(tmpDst)
|
||||
return fmt.Errorf("close %s: %w", tmpDst, err)
|
||||
}
|
||||
if err := os.Rename(tmpDst, dst); err != nil {
|
||||
_ = os.Remove(tmpDst)
|
||||
return fmt.Errorf("rename %s -> %s: %w", tmpDst, dst, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
315
weed/storage/store_ec_mirror_test.go
Normal file
315
weed/storage/store_ec_mirror_test.go
Normal file
@@ -0,0 +1,315 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
|
||||
"github.com/seaweedfs/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
// TestMirrorEcMetadataOnStartup_PhysicallyCopiesSidecars sets up the
|
||||
// same orphan-shard layout as TestLoadEcShardsWhenIndexFilesOnDifferentDisk
|
||||
// (shards on dir0, sidecars on dir1) and asserts the additional
|
||||
// guarantee: after NewStore returns, dir0 has its OWN copy of every EC
|
||||
// sidecar so the EcVolume mounts self-contained instead of reaching
|
||||
// across to dir1.
|
||||
//
|
||||
// The mirror is the load-bearing piece for the EC lifecycle (encode /
|
||||
// decode / balance / vacuum / repair) same-disk invariant. Without it,
|
||||
// a future failure of dir1 (the index owner) silently disables every
|
||||
// shard on dir0, even though those shards are physically fine.
|
||||
func TestMirrorEcMetadataOnStartup_PhysicallyCopiesSidecars(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
dir0 := filepath.Join(tempDir, "data1")
|
||||
dir1 := filepath.Join(tempDir, "data2")
|
||||
for _, d := range []string{dir0, dir1} {
|
||||
if err := os.MkdirAll(d, 0o755); err != nil {
|
||||
t.Fatalf("mkdir %s: %v", d, err)
|
||||
}
|
||||
}
|
||||
|
||||
collection := "video-recordings"
|
||||
vid := needle.VolumeId(4121)
|
||||
const dataShards, parityShards = 10, 4
|
||||
const datSize int64 = 10 * 1024 * 1024
|
||||
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
|
||||
|
||||
writeShard := func(dir string, shardId int) {
|
||||
t.Helper()
|
||||
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
|
||||
f, err := os.Create(base + erasure_coding.ToExt(shardId))
|
||||
if err != nil {
|
||||
t.Fatalf("create shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
if err := f.Truncate(expectedShardSize); err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("truncate shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
// dir0: orphan shards 0 and 12; no sidecars locally yet.
|
||||
writeShard(dir0, 0)
|
||||
writeShard(dir0, 12)
|
||||
|
||||
// dir1: sidecars + shard 1.
|
||||
writeShard(dir1, 1)
|
||||
base1 := erasure_coding.EcShardFileName(collection, dir1, int(vid))
|
||||
|
||||
// Sentinel bytes in each sidecar so we can detect that the mirrored
|
||||
// copy on dir0 is byte-for-byte identical and not, say, a stub
|
||||
// created by NewEcVolume's "missing .vif" fallback.
|
||||
ecxBytes := bytes.Repeat([]byte{0xA1}, 20)
|
||||
ecjBytes := bytes.Repeat([]byte{0xB2}, 16)
|
||||
if err := os.WriteFile(base1+".ecx", ecxBytes, 0o644); err != nil {
|
||||
t.Fatalf("write .ecx: %v", err)
|
||||
}
|
||||
if err := os.WriteFile(base1+".ecj", ecjBytes, 0o644); err != nil {
|
||||
t.Fatalf("write .ecj: %v", err)
|
||||
}
|
||||
if err := volume_info.SaveVolumeInfo(base1+".vif", &volume_server_pb.VolumeInfo{
|
||||
Version: uint32(needle.Version3),
|
||||
DatFileSize: datSize,
|
||||
EcShardConfig: &volume_server_pb.EcShardConfig{
|
||||
DataShards: dataShards,
|
||||
ParityShards: parityShards,
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("save .vif: %v", err)
|
||||
}
|
||||
|
||||
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
|
||||
[]string{dir0, dir1},
|
||||
[]int32{100, 100},
|
||||
[]util.MinFreeSpace{{}, {}},
|
||||
"",
|
||||
NeedleMapInMemory,
|
||||
[]types.DiskType{types.HardDriveType, types.HardDriveType},
|
||||
nil,
|
||||
3,
|
||||
)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-store.NewEcShardsChan:
|
||||
case <-store.NewVolumesChan:
|
||||
case <-store.DeletedVolumesChan:
|
||||
case <-store.DeletedEcShardsChan:
|
||||
case <-store.StateUpdateChan:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
store.Close()
|
||||
close(done)
|
||||
})
|
||||
|
||||
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
|
||||
|
||||
// dir0 must have received byte-identical copies of .ecx and .ecj.
|
||||
// (.vif is checked separately below: NewEcVolume's version
|
||||
// verification rewrites .vif when the mounted shard's header
|
||||
// disagrees with the recorded version, so byte-equality is not the
|
||||
// right invariant for .vif on synthetic-shard fixtures.)
|
||||
for _, ext := range []string{".ecx", ".ecj"} {
|
||||
src := base1 + ext
|
||||
dst := base0 + ext
|
||||
gotBytes, err := os.ReadFile(dst)
|
||||
if err != nil {
|
||||
t.Errorf("sidecar %s not mirrored to dir0: %v", ext, err)
|
||||
continue
|
||||
}
|
||||
wantBytes, err := os.ReadFile(src)
|
||||
if err != nil {
|
||||
t.Fatalf("read source %s: %v", src, err)
|
||||
}
|
||||
if !bytes.Equal(gotBytes, wantBytes) {
|
||||
t.Errorf("sidecar %s content mismatch between dir0 and dir1", ext)
|
||||
}
|
||||
}
|
||||
|
||||
// .vif must exist on dir0 and decode to the same EC ratio the
|
||||
// source advertised; the exact bytes can drift due to
|
||||
// version-correction on first mount.
|
||||
dir0Info, _, found, err := volume_info.MaybeLoadVolumeInfo(base0 + ".vif")
|
||||
if err != nil || !found {
|
||||
t.Errorf("dir0 .vif missing or unreadable after mirror: err=%v found=%v", err, found)
|
||||
} else if dir0Info.EcShardConfig == nil ||
|
||||
dir0Info.EcShardConfig.DataShards != dataShards ||
|
||||
dir0Info.EcShardConfig.ParityShards != parityShards {
|
||||
t.Errorf("dir0 .vif has wrong EC ratio after mirror: got %+v, want %d+%d",
|
||||
dir0Info.EcShardConfig, dataShards, parityShards)
|
||||
}
|
||||
|
||||
// The shards on dir0 must still be loaded — same guarantee as the
|
||||
// pre-mirror orphan reconciler — and the EcVolume's index handle
|
||||
// must resolve to dir0 itself, not to dir1.
|
||||
loc0 := store.Locations[0]
|
||||
ev0, found := loc0.FindEcVolume(vid)
|
||||
if !found {
|
||||
t.Fatalf("dir0 EcVolume %d not loaded after startup mirror", vid)
|
||||
}
|
||||
for _, sid := range []erasure_coding.ShardId{0, 12} {
|
||||
if _, ok := ev0.FindEcVolumeShard(sid); !ok {
|
||||
t.Errorf("shard %d not registered on dir0 after mirror", sid)
|
||||
}
|
||||
}
|
||||
if got, want := filepath.Dir(ev0.FileName(".ecx")), dir0; got != want {
|
||||
t.Errorf("dir0 EcVolume .ecx resolved at %q, want %q (local mirrored copy)", got, want)
|
||||
}
|
||||
|
||||
// dir1 must keep its own self-contained mount for shard 1; the
|
||||
// mirror does not touch the source.
|
||||
loc1 := store.Locations[1]
|
||||
ev1, found := loc1.FindEcVolume(vid)
|
||||
if !found {
|
||||
t.Fatalf("dir1 EcVolume %d not loaded after startup mirror", vid)
|
||||
}
|
||||
if _, ok := ev1.FindEcVolumeShard(1); !ok {
|
||||
t.Errorf("dir1 shard 1 missing after mirror")
|
||||
}
|
||||
|
||||
// Shards on dir0 must not have been destroyed by any cleanup path.
|
||||
for _, sid := range []int{0, 12} {
|
||||
shardPath := base0 + erasure_coding.ToExt(sid)
|
||||
fi, err := os.Stat(shardPath)
|
||||
if err != nil {
|
||||
t.Errorf("shard %d on dir0 lost after mirror: %v", sid, err)
|
||||
continue
|
||||
}
|
||||
if fi.Size() != expectedShardSize {
|
||||
t.Errorf("shard %d on dir0 truncated by mirror pass: size %d, want %d", sid, fi.Size(), expectedShardSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestMirrorEcMetadataOnStartup_NoOpWhenAlreadyMirrored guards against
|
||||
// the mirror pass rewriting sidecars that already match. Idempotency
|
||||
// matters because the pass runs on every startup; an unconditional
|
||||
// rewrite would slowly amplify into "fsync every EC volume's metadata
|
||||
// on boot," which is expensive on large clusters.
|
||||
func TestMirrorEcMetadataOnStartup_NoOpWhenAlreadyMirrored(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
dir0 := filepath.Join(tempDir, "data1")
|
||||
dir1 := filepath.Join(tempDir, "data2")
|
||||
for _, d := range []string{dir0, dir1} {
|
||||
if err := os.MkdirAll(d, 0o755); err != nil {
|
||||
t.Fatalf("mkdir %s: %v", d, err)
|
||||
}
|
||||
}
|
||||
|
||||
collection := "video-recordings"
|
||||
vid := needle.VolumeId(7777)
|
||||
const dataShards, parityShards = 10, 4
|
||||
const datSize int64 = 10 * 1024 * 1024
|
||||
expectedShardSize := calculateExpectedShardSize(datSize, dataShards)
|
||||
|
||||
plantShard := func(dir string, shardId int) {
|
||||
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
|
||||
f, err := os.Create(base + erasure_coding.ToExt(shardId))
|
||||
if err != nil {
|
||||
t.Fatalf("create shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
if err := f.Truncate(expectedShardSize); err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("truncate shard %d in %s: %v", shardId, dir, err)
|
||||
}
|
||||
f.Close()
|
||||
}
|
||||
|
||||
plantSidecars := func(dir string, ecx, ecj []byte) {
|
||||
base := erasure_coding.EcShardFileName(collection, dir, int(vid))
|
||||
if err := os.WriteFile(base+".ecx", ecx, 0o644); err != nil {
|
||||
t.Fatalf("write %s.ecx: %v", base, err)
|
||||
}
|
||||
if err := os.WriteFile(base+".ecj", ecj, 0o644); err != nil {
|
||||
t.Fatalf("write %s.ecj: %v", base, err)
|
||||
}
|
||||
if err := volume_info.SaveVolumeInfo(base+".vif", &volume_server_pb.VolumeInfo{
|
||||
Version: uint32(needle.Version3),
|
||||
DatFileSize: datSize,
|
||||
EcShardConfig: &volume_server_pb.EcShardConfig{
|
||||
DataShards: dataShards,
|
||||
ParityShards: parityShards,
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("save %s.vif: %v", base, err)
|
||||
}
|
||||
}
|
||||
|
||||
plantShard(dir0, 0)
|
||||
plantShard(dir0, 12)
|
||||
plantShard(dir1, 1)
|
||||
|
||||
// Both dirs already have sidecars; we plant deliberately different
|
||||
// .ecx bytes so we can prove the mirror does NOT clobber the
|
||||
// pre-existing destination with the owner's copy. .ecx is the
|
||||
// strongest signal here — unlike .vif, EcVolume never rewrites
|
||||
// .ecx at mount, so any drift would be the mirror's fault.
|
||||
ecxOwner := bytes.Repeat([]byte{0xC3}, 20)
|
||||
ecxLocal := bytes.Repeat([]byte{0x5A}, 20)
|
||||
ecjBytes := bytes.Repeat([]byte{0xD4}, 16)
|
||||
plantSidecars(dir1, ecxOwner, ecjBytes)
|
||||
plantSidecars(dir0, ecxLocal, ecjBytes)
|
||||
|
||||
base0 := erasure_coding.EcShardFileName(collection, dir0, int(vid))
|
||||
|
||||
store := NewStore(nil, "localhost", 8080, 18080, "http://localhost:8080", "store-id",
|
||||
[]string{dir0, dir1},
|
||||
[]int32{100, 100},
|
||||
[]util.MinFreeSpace{{}, {}},
|
||||
"",
|
||||
NeedleMapInMemory,
|
||||
[]types.DiskType{types.HardDriveType, types.HardDriveType},
|
||||
nil,
|
||||
3,
|
||||
)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-store.NewEcShardsChan:
|
||||
case <-store.NewVolumesChan:
|
||||
case <-store.DeletedVolumesChan:
|
||||
case <-store.DeletedEcShardsChan:
|
||||
case <-store.StateUpdateChan:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
store.Close()
|
||||
close(done)
|
||||
})
|
||||
|
||||
postEcx, err := os.ReadFile(base0 + ".ecx")
|
||||
if err != nil {
|
||||
t.Fatalf("read dir0 .ecx after NewStore: %v", err)
|
||||
}
|
||||
if !bytes.Equal(ecxLocal, postEcx) {
|
||||
t.Errorf("dir0 .ecx was overwritten by mirror pass; pre-existing destination must be preserved (got first 4 bytes %x, want %x)",
|
||||
postEcx[:4], ecxLocal[:4])
|
||||
}
|
||||
|
||||
// And the volume must still mount on both disks with its own
|
||||
// local .ecx — the same-disk invariant the mirror is here to
|
||||
// preserve.
|
||||
if loc0 := store.Locations[0]; loc0 == nil {
|
||||
t.Fatal("loc0 unexpectedly nil")
|
||||
} else if ev, found := loc0.FindEcVolume(vid); !found {
|
||||
t.Errorf("dir0 EcVolume %d not loaded", vid)
|
||||
} else if got, want := filepath.Dir(ev.FileName(".ecx")), dir0; got != want {
|
||||
t.Errorf("dir0 .ecx resolved at %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
@@ -79,6 +79,21 @@ func (s *Store) reconcileEcShardsAcrossDisks() {
|
||||
key.vid, key.collection, loc.Directory, shards)
|
||||
continue
|
||||
}
|
||||
// If mirrorEcMetadataToShardDisks already copied the sidecars
|
||||
// onto this disk, prefer the local IdxDirectory: the shard
|
||||
// then mounts self-contained and survives any subsequent
|
||||
// failure of the owner disk. The cross-disk fallback below
|
||||
// stays as the rescue path for volumes whose mirror failed
|
||||
// (read-only target, out of space, partial copy on a previous
|
||||
// boot, etc.).
|
||||
if loc.HasEcxFileOnDisk(key.collection, key.vid) {
|
||||
glog.V(0).Infof("ec volume %d (collection=%q): loading orphan shards %v on %s against locally-mirrored sidecars",
|
||||
key.vid, key.collection, shards, loc.Directory)
|
||||
if err := loc.loadEcShards(shards, key.collection, key.vid, loc.ecShardNotifyHandler); err != nil {
|
||||
glog.Errorf("ec volume %d on %s: local-mirror shard load failed: %v", key.vid, loc.Directory, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if owner.location == loc {
|
||||
// .ecx is on this same disk, but loadAllEcShards still
|
||||
// did not load these shards — handleFoundEcxFile already
|
||||
|
||||
Reference in New Issue
Block a user