diff --git a/sw-block/design/v2-rebuild-mvp-session-protocol.md b/sw-block/design/v2-rebuild-mvp-session-protocol.md index 82462c687..acfc1a1b3 100644 --- a/sw-block/design/v2-rebuild-mvp-session-protocol.md +++ b/sw-block/design/v2-rebuild-mvp-session-protocol.md @@ -348,6 +348,23 @@ Current MVP implementation choices: 3. reuse the existing rebuild TCP path for `sessionData` rather than inventing a new transport first +Current server-layer skeleton: + +1. `BlockService.StartReplicaRebuildSession(path, config)` +2. `BlockService.ApplyReplicaRebuildWALEntry(path, session_id, entry)` +3. `BlockService.ApplyReplicaRebuildBaseBlock(path, session_id, lba, data)` +4. `BlockService.MarkReplicaRebuildBaseComplete(path, session_id, total_blocks)` +5. `BlockService.TryCompleteReplicaRebuildSession(path, session_id)` +6. `BlockService.CancelReplicaRebuildSession(path, session_id, reason)` +7. `BlockService.ReplicaRebuildSession(path)` + +Server-layer responsibility: + +1. decode incoming `sessionControl` / `walData` / `sessionData` +2. map them onto the local volume path +3. route them into the `BlockService` skeleton above +4. build `sessionAck` from `ReplicaRebuildSession(path)` + ## Replica State Machine ```mermaid diff --git a/weed/server/block_rebuild_session.go b/weed/server/block_rebuild_session.go new file mode 100644 index 000000000..52405e793 --- /dev/null +++ b/weed/server/block_rebuild_session.go @@ -0,0 +1,126 @@ +package weed_server + +import ( + "fmt" + + "github.com/seaweedfs/seaweedfs/weed/storage/blockvol" +) + +// ReplicaRebuildSessionSnapshot is the host-visible rebuild session view for one +// local replica volume. +// +// This is the server-layer skeleton for future transport wiring: +// - sessionControl(start_rebuild) -> StartReplicaRebuildSession +// - walData -> ApplyReplicaRebuildWALEntry +// - sessionData -> ApplyReplicaRebuildBaseBlock +// - sessionData EOF -> MarkReplicaRebuildBaseComplete +// - sessionAck/progress poll -> ReplicaRebuildSession +// - completion gate -> TryCompleteReplicaRebuildSession +// - cancel/supersede -> CancelReplicaRebuildSession +// +// These hooks intentionally do not decode wire messages or decide protocol +// semantics. They only route host-side intent into the local BlockVol surface. +type ReplicaRebuildSessionSnapshot struct { + Path string + Config blockvol.RebuildSessionConfig + Progress blockvol.RebuildSessionProgress +} + +// StartReplicaRebuildSession installs one local rebuild session for the +// specified replica volume. +func (bs *BlockService) StartReplicaRebuildSession(path string, config blockvol.RebuildSessionConfig) error { + if bs == nil || bs.blockStore == nil { + return fmt.Errorf("block service not enabled") + } + return bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + return vol.StartRebuildSession(config) + }) +} + +// ApplyReplicaRebuildWALEntry routes one WAL data message into the active local +// rebuild session after session ID validation. +func (bs *BlockService) ApplyReplicaRebuildWALEntry(path string, sessionID uint64, entry *blockvol.WALEntry) error { + if bs == nil || bs.blockStore == nil { + return fmt.Errorf("block service not enabled") + } + return bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + return vol.ApplyRebuildSessionWALEntry(sessionID, entry) + }) +} + +// ApplyReplicaRebuildBaseBlock routes one base-lane block into the active local +// rebuild session after session ID validation. +func (bs *BlockService) ApplyReplicaRebuildBaseBlock(path string, sessionID uint64, lba uint64, data []byte) (bool, error) { + if bs == nil || bs.blockStore == nil { + return false, fmt.Errorf("block service not enabled") + } + applied := false + err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + var err error + applied, err = vol.ApplyRebuildSessionBaseBlock(sessionID, lba, data) + return err + }) + return applied, err +} + +// MarkReplicaRebuildBaseComplete closes the base-copy lane for one local +// rebuild session. +func (bs *BlockService) MarkReplicaRebuildBaseComplete(path string, sessionID uint64, totalBlocks uint64) error { + if bs == nil || bs.blockStore == nil { + return fmt.Errorf("block service not enabled") + } + return bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + return vol.MarkRebuildSessionBaseComplete(sessionID, totalBlocks) + }) +} + +// TryCompleteReplicaRebuildSession evaluates the local dual-lane completion +// gate for one rebuild session. +func (bs *BlockService) TryCompleteReplicaRebuildSession(path string, sessionID uint64) (uint64, bool, error) { + if bs == nil || bs.blockStore == nil { + return 0, false, fmt.Errorf("block service not enabled") + } + var achieved uint64 + var completed bool + err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + var err error + achieved, completed, err = vol.TryCompleteRebuildSession(sessionID) + return err + }) + return achieved, completed, err +} + +// CancelReplicaRebuildSession cancels the local rebuild session. Used by +// explicit cancel, supersede, or transport teardown paths. +func (bs *BlockService) CancelReplicaRebuildSession(path string, sessionID uint64, reason string) error { + if bs == nil || bs.blockStore == nil { + return fmt.Errorf("block service not enabled") + } + return bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + return vol.CancelRebuildSession(sessionID, reason) + }) +} + +// ReplicaRebuildSession returns the current local rebuild session snapshot for +// sessionAck construction or diagnostics. +func (bs *BlockService) ReplicaRebuildSession(path string) (ReplicaRebuildSessionSnapshot, bool, error) { + if bs == nil || bs.blockStore == nil { + return ReplicaRebuildSessionSnapshot{}, false, fmt.Errorf("block service not enabled") + } + var snap ReplicaRebuildSessionSnapshot + var ok bool + err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + cfg, progress, found := vol.ActiveRebuildSession() + if !found { + return nil + } + ok = true + snap = ReplicaRebuildSessionSnapshot{ + Path: path, + Config: cfg, + Progress: progress, + } + return nil + }) + return snap, ok, err +} diff --git a/weed/server/volume_server_block_test.go b/weed/server/volume_server_block_test.go index 1215df5a6..fe94a04b2 100644 --- a/weed/server/volume_server_block_test.go +++ b/weed/server/volume_server_block_test.go @@ -1109,6 +1109,107 @@ func TestBlockService_ApplyAssignments_RebuildingRole_PreservesLegacyFallbackWit } } +func TestBlockService_ReplicaRebuildSessionSkeleton_RoutesToVolume(t *testing.T) { + bs := newTestBlockServiceDirect(t) + path := createTestVolDirect(t, bs, "vol-rebuild-session-skeleton") + + if err := bs.StartReplicaRebuildSession(path, blockvol.RebuildSessionConfig{ + SessionID: 7, + Epoch: 1, + BaseLSN: 1, + TargetLSN: 1, + }); err != nil { + t.Fatalf("start rebuild session: %v", err) + } + + snap, ok, err := bs.ReplicaRebuildSession(path) + if err != nil { + t.Fatalf("read rebuild session: %v", err) + } + if !ok { + t.Fatal("expected active rebuild session") + } + if snap.Config.SessionID != 7 { + t.Fatalf("session_id=%d, want 7", snap.Config.SessionID) + } + if snap.Progress.Phase != blockvol.RebuildPhaseRunning { + t.Fatalf("phase=%s, want running", snap.Progress.Phase) + } + + if err := bs.ApplyReplicaRebuildWALEntry(path, 7, &blockvol.WALEntry{ + LSN: 1, + Epoch: 1, + Type: blockvol.EntryTypeWrite, + LBA: 0, + Length: 4096, + Data: bytes.Repeat([]byte{0xAB}, 4096), + }); err != nil { + t.Fatalf("apply rebuild WAL entry: %v", err) + } + + applied, err := bs.ApplyReplicaRebuildBaseBlock(path, 7, 0, bytes.Repeat([]byte{0xCD}, 4096)) + if err != nil { + t.Fatalf("apply rebuild base block: %v", err) + } + if applied { + t.Fatal("expected base block to be skipped after WAL apply") + } + + if err := bs.MarkReplicaRebuildBaseComplete(path, 7, 1); err != nil { + t.Fatalf("mark base complete: %v", err) + } + achieved, completed, err := bs.TryCompleteReplicaRebuildSession(path, 7) + if err != nil { + t.Fatalf("try complete: %v", err) + } + if !completed || achieved != 1 { + t.Fatalf("achieved=%d completed=%v, want achieved=1 completed=true", achieved, completed) + } + + snap, ok, err = bs.ReplicaRebuildSession(path) + if err != nil { + t.Fatalf("read completed session: %v", err) + } + if !ok || !snap.Progress.Completed() { + t.Fatalf("session completed=%v ok=%v", snap.Progress.Completed(), ok) + } + + if err := bs.CancelReplicaRebuildSession(path, 7, "test_done"); err != nil { + t.Fatalf("cancel rebuild session: %v", err) + } + if _, ok, err := bs.ReplicaRebuildSession(path); err != nil { + t.Fatalf("read cleared session: %v", err) + } else if ok { + t.Fatal("expected no active rebuild session after cancel") + } +} + +func TestBlockService_ReplicaRebuildSessionSkeleton_RejectsStaleSessionID(t *testing.T) { + bs := newTestBlockServiceDirect(t) + path := createTestVolDirect(t, bs, "vol-rebuild-session-stale") + + if err := bs.StartReplicaRebuildSession(path, blockvol.RebuildSessionConfig{ + SessionID: 9, + Epoch: 1, + BaseLSN: 1, + TargetLSN: 1, + }); err != nil { + t.Fatalf("start rebuild session: %v", err) + } + + err := bs.ApplyReplicaRebuildWALEntry(path, 8, &blockvol.WALEntry{ + LSN: 1, + Epoch: 1, + Type: blockvol.EntryTypeWrite, + LBA: 0, + Length: 4096, + Data: bytes.Repeat([]byte{0xEF}, 4096), + }) + if err == nil { + t.Fatal("expected stale session ID to be rejected") + } +} + func TestBlockService_BarrierRejected_ExecutesCoreInvalidateSession(t *testing.T) { bs := newTestBlockServiceDirect(t) bs.v2Bridge = newTestControlBridge()