From 3da4c19046e06ec7e75cb449e0d9287ef1a3473b Mon Sep 17 00:00:00 2001 From: pingqiu Date: Fri, 3 Apr 2026 11:47:41 -0700 Subject: [PATCH] =?UTF-8?q?fix:=20CP13-8A=20=E2=80=94=20fix=20malformed=20?= =?UTF-8?q?replica=20address=20in=20test=20allocator=20+=20add=20read=20pr?= =?UTF-8?q?oof?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Investigation result: - Dual-BlockVol hypothesis: DISPROVEN (one instance per path, correct wiring) - Root cause: adapter wiring bug in test allocator soak_test.go blockVSAllocate returned ReplicaDataAddr = "vs2:9333:14260" (server + ":port" where server already has a port → three colons, invalid) This caused setupReplicaReceiver to fail silently → no data replicated Root cause classification: adapter/test-harness bug - NOT a backend data visibility bug - NOT a core-rule gap - The engine read path works correctly (TestSyncAll_FullRoundTrip passes) Code changes: - qa_block_soak_test.go: fix allocator to use host:port (not server:port), use deterministic FNV-hashed ports matching production ReplicationPorts - qa_block_cp13_8a_test.go: 2 new integration tests proving replica reads work through both ReadLBA and adapter.ReadAt, before and after promotion Remaining contradiction for CP13-8 scenario on real hardware: - The production weed cluster uses ReplicationPorts (deterministic) which should not have this bug. If CP13-8 still fails on m01/M02, the cause is different from this test-harness issue and needs a separate investigation. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/server/qa_block_cp13_8a_test.go | 206 +++++++++++++++++++++++++++ weed/server/qa_block_soak_test.go | 14 +- 2 files changed, 217 insertions(+), 3 deletions(-) create mode 100644 weed/server/qa_block_cp13_8a_test.go diff --git a/weed/server/qa_block_cp13_8a_test.go b/weed/server/qa_block_cp13_8a_test.go new file mode 100644 index 000000000..7d068240e --- /dev/null +++ b/weed/server/qa_block_cp13_8a_test.go @@ -0,0 +1,206 @@ +package weed_server + +import ( + "bytes" + "context" +"testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +// TestCP13_8A_ReplicaReadAfterReplication verifies that data replicated to +// a replica volume can be read back through the same BlockVol instance that +// the iSCSI adapter uses. This is the core CP13-8A investigation test. +// +// If this passes: the engine + weed integration is correct, and the CP13-8 +// scenario failure is in the testrunner/cluster layer. +// If this fails: the bug is in the VS integration (adapter wiring, flusher, etc). +func TestCP13_8A_ReplicaReadAfterReplication(t *testing.T) { + s := newSoakSetup(t) + ctx := context.Background() + + // Create a volume (RF=2 sync_all). + resp, err := s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: "cp13-8a-read", + SizeBytes: 1 << 20, + DurabilityMode: "sync_all", + }) + if err != nil { + t.Fatal(err) + } + primaryVS := resp.VolumeServer + entry, _ := s.ms.blockRegistry.Lookup("cp13-8a-read") + + // Deliver assignments. + s.bs.localServerID = primaryVS + s.deliver(primaryVS) + for _, ri := range entry.Replicas { + s.deliver(ri.Server) + } + time.Sleep(200 * time.Millisecond) + + // Write data through the primary's BlockVol (simulating iSCSI write). + primaryPath := entry.Path + var primaryVol *blockvol.BlockVol + s.store.IterateBlockVolumes(func(path string, vol *blockvol.BlockVol) { + if path == primaryPath { + primaryVol = vol + } + }) + if primaryVol == nil { + t.Fatal("primary volume not found in store") + } + + // Write test pattern. + testData := make([]byte, 4096) + for i := range testData { + testData[i] = byte(i % 251) // distinctive non-zero pattern + } + if err := primaryVol.WriteLBA(0, testData); err != nil { + t.Fatalf("WriteLBA on primary: %v", err) + } + if err := primaryVol.WriteLBA(1, testData); err != nil { + t.Fatalf("WriteLBA on primary LBA 1: %v", err) + } + + // SyncCache to trigger sync_all barrier (replicates + confirms durability). + if err := primaryVol.SyncCache(); err != nil { + t.Fatalf("SyncCache: %v", err) + } + + // Now find the REPLICA volume and read from it. + // In the soak setup, both primary and replica are in the same store. + var replicaPath string + for _, ri := range entry.Replicas { + replicaPath = ri.Path + break + } + if replicaPath == "" { + t.Fatal("no replica path found") + } + + var replicaVol *blockvol.BlockVol + s.store.IterateBlockVolumes(func(path string, vol *blockvol.BlockVol) { + if path == replicaPath { + replicaVol = vol + } + }) + if replicaVol == nil { + t.Fatal("replica volume not found in store") + } + + // Let flusher run on replica (same as production). + time.Sleep(200 * time.Millisecond) + + // Read from replica — this is what the iSCSI adapter would do. + got, err := replicaVol.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA on replica: %v", err) + } + + if bytes.Equal(got, make([]byte, 4096)) { + t.Fatal("CP13-8A: replica ReadLBA returned all zeros — data not visible") + } + + if !bytes.Equal(got, testData) { + t.Fatalf("CP13-8A: replica data mismatch at LBA 0: got[0]=%d want[0]=%d", got[0], testData[0]) + } + + // Read LBA 1 too. + got1, err := replicaVol.ReadLBA(1, 4096) + if err != nil { + t.Fatalf("ReadLBA on replica LBA 1: %v", err) + } + if !bytes.Equal(got1, testData) { + t.Fatalf("CP13-8A: replica data mismatch at LBA 1") + } + + // Also test through the adapter path (what iSCSI actually calls). + adapter := blockvol.NewBlockVolAdapter(replicaVol) + adapterGot, err := adapter.ReadAt(0, 4096) + if err != nil { + t.Fatalf("adapter ReadAt: %v", err) + } + if !bytes.Equal(adapterGot, testData) { + t.Fatalf("CP13-8A: adapter data mismatch at LBA 0") + } + + t.Log("CP13-8A: replica reads return correct data through both ReadLBA and adapter.ReadAt") +} + +// TestCP13_8A_ReplicaReadAfterPromotion verifies that after promoting a +// replica to primary, the data is still readable. +func TestCP13_8A_ReplicaReadAfterPromotion(t *testing.T) { + s := newSoakSetup(t) + ctx := context.Background() + + resp, err := s.ms.CreateBlockVolume(ctx, &master_pb.CreateBlockVolumeRequest{ + Name: "cp13-8a-promote", + SizeBytes: 1 << 20, + DurabilityMode: "sync_all", + }) + if err != nil { + t.Fatal(err) + } + primaryVS := resp.VolumeServer + entry, _ := s.ms.blockRegistry.Lookup("cp13-8a-promote") + + s.bs.localServerID = primaryVS + s.deliver(primaryVS) + for _, ri := range entry.Replicas { + s.deliver(ri.Server) + } + time.Sleep(200 * time.Millisecond) + + // Write through primary. + var primaryVol *blockvol.BlockVol + s.store.IterateBlockVolumes(func(path string, vol *blockvol.BlockVol) { + if path == entry.Path { + primaryVol = vol + } + }) + testData := make([]byte, 4096) + for i := range testData { + testData[i] = byte((i + 37) % 251) + } + if err := primaryVol.WriteLBA(0, testData); err != nil { + t.Fatal(err) + } + if err := primaryVol.SyncCache(); err != nil { + t.Fatal(err) + } + + // Find replica. + var replicaPath string + for _, ri := range entry.Replicas { + replicaPath = ri.Path + } + var replicaVol *blockvol.BlockVol + s.store.IterateBlockVolumes(func(path string, vol *blockvol.BlockVol) { + if path == replicaPath { + replicaVol = vol + } + }) + + // Promote replica to primary. + if err := replicaVol.HandleAssignment(2, blockvol.RolePrimary, 30*time.Second); err != nil { + t.Fatalf("promote replica: %v", err) + } + + // Read after promotion (let flusher run). + time.Sleep(200 * time.Millisecond) + got, err := replicaVol.ReadLBA(0, 4096) + if err != nil { + t.Fatalf("ReadLBA after promotion: %v", err) + } + if bytes.Equal(got, make([]byte, 4096)) { + t.Fatal("CP13-8A: promoted replica reads zeros after promotion") + } + if !bytes.Equal(got, testData) { + t.Fatalf("CP13-8A: promoted replica data mismatch") + } + + t.Log("CP13-8A: promoted replica reads correct data") +} diff --git a/weed/server/qa_block_soak_test.go b/weed/server/qa_block_soak_test.go index 501abc9c9..3e873cddc 100644 --- a/weed/server/qa_block_soak_test.go +++ b/weed/server/qa_block_soak_test.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "hash/fnv" "os" "path/filepath" "strings" @@ -68,13 +69,20 @@ func newSoakSetup(t *testing.T) *soakSetup { if idx := strings.LastIndex(server, ":"); idx >= 0 { host = server[:idx] } + // Use deterministic ports for replication (matching production ReplicationPorts behavior). + h := fnv.New32a() + h.Write([]byte(volPath)) + offset := int(h.Sum32()%500) * 3 + dataPort := 14000 + offset + ctrlPort := dataPort + 1 + rebuildPort := dataPort + 2 return &blockAllocResult{ Path: volPath, IQN: fmt.Sprintf("iqn.2024.test:%s", name), ISCSIAddr: host + ":3260", - ReplicaDataAddr: server + ":14260", - ReplicaCtrlAddr: server + ":14261", - RebuildListenAddr: server + ":15000", + ReplicaDataAddr: fmt.Sprintf("127.0.0.1:%d", dataPort), + ReplicaCtrlAddr: fmt.Sprintf("127.0.0.1:%d", ctrlPort), + RebuildListenAddr: fmt.Sprintf("127.0.0.1:%d", rebuildPort), }, nil } ms.blockVSDelete = func(ctx context.Context, server string, name string) error { return nil }