feat: server-layer rebuild session skeleton — host routing for MVP

Add BlockService replica-side rebuild routing API that bridges
transport/host layer to BlockVol session surface:

  StartReplicaRebuildSession(path, config)
  ApplyReplicaRebuildWALEntry(path, sessionID, entry)
  ApplyReplicaRebuildBaseBlock(path, sessionID, lba, data)
  MarkReplicaRebuildBaseComplete(path, sessionID, totalBlocks)
  TryCompleteReplicaRebuildSession(path, sessionID)
  CancelReplicaRebuildSession(path, sessionID, reason)
  ReplicaRebuildSession(path) → snapshot

Each method does one thing: validate → WithVolume → delegate to BlockVol.
No wire decoding, no protocol decisions, no state invention. Transport
wiring (sessionControl/walData/sessionData handlers) is the next step.

2 focused tests: skeleton routes correctly, stale session ID rejected.

Updated v2-rebuild-mvp-session-protocol.md with server skeleton section.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
pingqiu
2026-04-07 14:53:32 -07:00
parent d2d57851b0
commit 49845dd509
3 changed files with 244 additions and 0 deletions

View File

@@ -348,6 +348,23 @@ Current MVP implementation choices:
3. reuse the existing rebuild TCP path for `sessionData` rather than inventing
a new transport first
Current server-layer skeleton:
1. `BlockService.StartReplicaRebuildSession(path, config)`
2. `BlockService.ApplyReplicaRebuildWALEntry(path, session_id, entry)`
3. `BlockService.ApplyReplicaRebuildBaseBlock(path, session_id, lba, data)`
4. `BlockService.MarkReplicaRebuildBaseComplete(path, session_id, total_blocks)`
5. `BlockService.TryCompleteReplicaRebuildSession(path, session_id)`
6. `BlockService.CancelReplicaRebuildSession(path, session_id, reason)`
7. `BlockService.ReplicaRebuildSession(path)`
Server-layer responsibility:
1. decode incoming `sessionControl` / `walData` / `sessionData`
2. map them onto the local volume path
3. route them into the `BlockService` skeleton above
4. build `sessionAck` from `ReplicaRebuildSession(path)`
## Replica State Machine
```mermaid

View File

@@ -0,0 +1,126 @@
package weed_server
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/storage/blockvol"
)
// ReplicaRebuildSessionSnapshot is the host-visible rebuild session view for one
// local replica volume.
//
// This is the server-layer skeleton for future transport wiring:
// - sessionControl(start_rebuild) -> StartReplicaRebuildSession
// - walData -> ApplyReplicaRebuildWALEntry
// - sessionData -> ApplyReplicaRebuildBaseBlock
// - sessionData EOF -> MarkReplicaRebuildBaseComplete
// - sessionAck/progress poll -> ReplicaRebuildSession
// - completion gate -> TryCompleteReplicaRebuildSession
// - cancel/supersede -> CancelReplicaRebuildSession
//
// These hooks intentionally do not decode wire messages or decide protocol
// semantics. They only route host-side intent into the local BlockVol surface.
type ReplicaRebuildSessionSnapshot struct {
Path string
Config blockvol.RebuildSessionConfig
Progress blockvol.RebuildSessionProgress
}
// StartReplicaRebuildSession installs one local rebuild session for the
// specified replica volume.
func (bs *BlockService) StartReplicaRebuildSession(path string, config blockvol.RebuildSessionConfig) error {
if bs == nil || bs.blockStore == nil {
return fmt.Errorf("block service not enabled")
}
return bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
return vol.StartRebuildSession(config)
})
}
// ApplyReplicaRebuildWALEntry routes one WAL data message into the active local
// rebuild session after session ID validation.
func (bs *BlockService) ApplyReplicaRebuildWALEntry(path string, sessionID uint64, entry *blockvol.WALEntry) error {
if bs == nil || bs.blockStore == nil {
return fmt.Errorf("block service not enabled")
}
return bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
return vol.ApplyRebuildSessionWALEntry(sessionID, entry)
})
}
// ApplyReplicaRebuildBaseBlock routes one base-lane block into the active local
// rebuild session after session ID validation.
func (bs *BlockService) ApplyReplicaRebuildBaseBlock(path string, sessionID uint64, lba uint64, data []byte) (bool, error) {
if bs == nil || bs.blockStore == nil {
return false, fmt.Errorf("block service not enabled")
}
applied := false
err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
var err error
applied, err = vol.ApplyRebuildSessionBaseBlock(sessionID, lba, data)
return err
})
return applied, err
}
// MarkReplicaRebuildBaseComplete closes the base-copy lane for one local
// rebuild session.
func (bs *BlockService) MarkReplicaRebuildBaseComplete(path string, sessionID uint64, totalBlocks uint64) error {
if bs == nil || bs.blockStore == nil {
return fmt.Errorf("block service not enabled")
}
return bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
return vol.MarkRebuildSessionBaseComplete(sessionID, totalBlocks)
})
}
// TryCompleteReplicaRebuildSession evaluates the local dual-lane completion
// gate for one rebuild session.
func (bs *BlockService) TryCompleteReplicaRebuildSession(path string, sessionID uint64) (uint64, bool, error) {
if bs == nil || bs.blockStore == nil {
return 0, false, fmt.Errorf("block service not enabled")
}
var achieved uint64
var completed bool
err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
var err error
achieved, completed, err = vol.TryCompleteRebuildSession(sessionID)
return err
})
return achieved, completed, err
}
// CancelReplicaRebuildSession cancels the local rebuild session. Used by
// explicit cancel, supersede, or transport teardown paths.
func (bs *BlockService) CancelReplicaRebuildSession(path string, sessionID uint64, reason string) error {
if bs == nil || bs.blockStore == nil {
return fmt.Errorf("block service not enabled")
}
return bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
return vol.CancelRebuildSession(sessionID, reason)
})
}
// ReplicaRebuildSession returns the current local rebuild session snapshot for
// sessionAck construction or diagnostics.
func (bs *BlockService) ReplicaRebuildSession(path string) (ReplicaRebuildSessionSnapshot, bool, error) {
if bs == nil || bs.blockStore == nil {
return ReplicaRebuildSessionSnapshot{}, false, fmt.Errorf("block service not enabled")
}
var snap ReplicaRebuildSessionSnapshot
var ok bool
err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
cfg, progress, found := vol.ActiveRebuildSession()
if !found {
return nil
}
ok = true
snap = ReplicaRebuildSessionSnapshot{
Path: path,
Config: cfg,
Progress: progress,
}
return nil
})
return snap, ok, err
}

View File

@@ -1109,6 +1109,107 @@ func TestBlockService_ApplyAssignments_RebuildingRole_PreservesLegacyFallbackWit
}
}
func TestBlockService_ReplicaRebuildSessionSkeleton_RoutesToVolume(t *testing.T) {
bs := newTestBlockServiceDirect(t)
path := createTestVolDirect(t, bs, "vol-rebuild-session-skeleton")
if err := bs.StartReplicaRebuildSession(path, blockvol.RebuildSessionConfig{
SessionID: 7,
Epoch: 1,
BaseLSN: 1,
TargetLSN: 1,
}); err != nil {
t.Fatalf("start rebuild session: %v", err)
}
snap, ok, err := bs.ReplicaRebuildSession(path)
if err != nil {
t.Fatalf("read rebuild session: %v", err)
}
if !ok {
t.Fatal("expected active rebuild session")
}
if snap.Config.SessionID != 7 {
t.Fatalf("session_id=%d, want 7", snap.Config.SessionID)
}
if snap.Progress.Phase != blockvol.RebuildPhaseRunning {
t.Fatalf("phase=%s, want running", snap.Progress.Phase)
}
if err := bs.ApplyReplicaRebuildWALEntry(path, 7, &blockvol.WALEntry{
LSN: 1,
Epoch: 1,
Type: blockvol.EntryTypeWrite,
LBA: 0,
Length: 4096,
Data: bytes.Repeat([]byte{0xAB}, 4096),
}); err != nil {
t.Fatalf("apply rebuild WAL entry: %v", err)
}
applied, err := bs.ApplyReplicaRebuildBaseBlock(path, 7, 0, bytes.Repeat([]byte{0xCD}, 4096))
if err != nil {
t.Fatalf("apply rebuild base block: %v", err)
}
if applied {
t.Fatal("expected base block to be skipped after WAL apply")
}
if err := bs.MarkReplicaRebuildBaseComplete(path, 7, 1); err != nil {
t.Fatalf("mark base complete: %v", err)
}
achieved, completed, err := bs.TryCompleteReplicaRebuildSession(path, 7)
if err != nil {
t.Fatalf("try complete: %v", err)
}
if !completed || achieved != 1 {
t.Fatalf("achieved=%d completed=%v, want achieved=1 completed=true", achieved, completed)
}
snap, ok, err = bs.ReplicaRebuildSession(path)
if err != nil {
t.Fatalf("read completed session: %v", err)
}
if !ok || !snap.Progress.Completed() {
t.Fatalf("session completed=%v ok=%v", snap.Progress.Completed(), ok)
}
if err := bs.CancelReplicaRebuildSession(path, 7, "test_done"); err != nil {
t.Fatalf("cancel rebuild session: %v", err)
}
if _, ok, err := bs.ReplicaRebuildSession(path); err != nil {
t.Fatalf("read cleared session: %v", err)
} else if ok {
t.Fatal("expected no active rebuild session after cancel")
}
}
func TestBlockService_ReplicaRebuildSessionSkeleton_RejectsStaleSessionID(t *testing.T) {
bs := newTestBlockServiceDirect(t)
path := createTestVolDirect(t, bs, "vol-rebuild-session-stale")
if err := bs.StartReplicaRebuildSession(path, blockvol.RebuildSessionConfig{
SessionID: 9,
Epoch: 1,
BaseLSN: 1,
TargetLSN: 1,
}); err != nil {
t.Fatalf("start rebuild session: %v", err)
}
err := bs.ApplyReplicaRebuildWALEntry(path, 8, &blockvol.WALEntry{
LSN: 1,
Epoch: 1,
Type: blockvol.EntryTypeWrite,
LBA: 0,
Length: 4096,
Data: bytes.Repeat([]byte{0xEF}, 4096),
})
if err == nil {
t.Fatal("expected stale session ID to be rejected")
}
}
func TestBlockService_BarrierRejected_ExecutesCoreInvalidateSession(t *testing.T) {
bs := newTestBlockServiceDirect(t)
bs.v2Bridge = newTestControlBridge()