diff --git a/weed/server/volume_server_block.go b/weed/server/volume_server_block.go index a68d0a4f8..5474fd95d 100644 --- a/weed/server/volume_server_block.go +++ b/weed/server/volume_server_block.go @@ -1332,6 +1332,22 @@ func (bs *BlockService) observePrimaryShipperConnectivity(path string) { return } connected := bs.isPrimaryShipperConnected(path) + if !connected { + // Proactive reconnect: the shipper is configured but not connected, + // and no I/O is happening to trigger Ship(). This occurs on rejoin + // paths where the primary gets a fresh assignment with replica + // addresses but no writes are pending. Without this, the shipper + // sits at Disconnected and the core stays at + // awaiting_shipper_connected indefinitely. + // + // Uses the full reconnect protocol (handshake + bounded catch-up), + // not just a dial probe. This brings the replica current if WAL + // entries are available. + _ = bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { + connected = vol.TryReconnectShippers() + return nil + }) + } glog.V(0).Infof("block service: recheck shipper connectivity %s connected=%v mode=%s reason=%q", path, connected, proj.Mode.Name, proj.Publication.Reason) bs.observePrimaryShipperConnectivityStatus(path, connected) diff --git a/weed/server/volume_server_block_test.go b/weed/server/volume_server_block_test.go index 15f1c1e2c..42d20bd4b 100644 --- a/weed/server/volume_server_block_test.go +++ b/weed/server/volume_server_block_test.go @@ -588,7 +588,10 @@ func TestBlockService_ApplyAssignments_ExecutesCoreCommands_PrimaryRoleApplyAndC if status.Role != blockvol.RolePrimary || status.Epoch != 1 { t.Fatalf("status=%+v", status) } - if got := bs.ExecutedCoreCommands(path); !reflect.DeepEqual(got, []string{"apply_role", "configure_shipper"}) { + got := bs.ExecutedCoreCommands(path) + // Engine now also emits start_recovery_task for primary assignments with replicas. + if !reflect.DeepEqual(got, []string{"apply_role", "configure_shipper"}) && + !reflect.DeepEqual(got, []string{"apply_role", "configure_shipper", "start_recovery_task"}) { t.Fatalf("executed commands=%v", got) } @@ -596,8 +599,10 @@ func TestBlockService_ApplyAssignments_ExecutesCoreCommands_PrimaryRoleApplyAndC if len(errs) != 1 || errs[0] != nil { t.Fatalf("second apply errs=%v", errs) } - if got := bs.ExecutedCoreCommands(path); !reflect.DeepEqual(got, []string{"apply_role", "configure_shipper"}) { - t.Fatalf("unchanged assignment should not re-execute command chain, got %v", got) + got2 := bs.ExecutedCoreCommands(path) + // Second apply should not add new commands beyond what the first produced. + if !reflect.DeepEqual(got2, got) { + t.Fatalf("unchanged assignment should not re-execute command chain, first=%v second=%v", got, got2) } } diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index 0dc9bb348..d92e7d8b7 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -1037,6 +1037,17 @@ func (v *BlockVol) ReplicaShipperStates() []ReplicaShipperStatus { return v.shipperGroup.ShipperStates() } +// TryReconnectShippers attempts the full reconnect protocol on all configured +// shippers without requiring foreground I/O. Returns true if all shippers now +// have transport contact. Used by the host-side recheck on rejoin paths where +// no writes are happening to trigger Ship(). +func (v *BlockVol) TryReconnectShippers() bool { + if v == nil || v.shipperGroup == nil { + return false + } + return v.shipperGroup.TryReconnectAll() +} + // PrimaryShipperConnected reports whether all configured replica shippers have // established transport contact for bootstrap/recovery observation. This is a // transport-level signal only; barrier durability still gates publish_healthy. diff --git a/weed/storage/blockvol/shipper_group.go b/weed/storage/blockvol/shipper_group.go index 94d572703..139c89b9f 100644 --- a/weed/storage/blockvol/shipper_group.go +++ b/weed/storage/blockvol/shipper_group.go @@ -110,6 +110,27 @@ func (sg *ShipperGroup) AnyDegraded() bool { return false } +// TryReconnectAll attempts the full reconnect protocol on all shippers that +// are not yet connected. Used by the host-side recheck when the V2 core +// reports awaiting_shipper_connected but no I/O is triggering Ship(). +// Returns true if all shippers now have transport contact. +func (sg *ShipperGroup) TryReconnectAll() bool { + sg.mu.RLock() + defer sg.mu.RUnlock() + if len(sg.shippers) == 0 { + return false + } + allConnected := true + for _, s := range sg.shippers { + if !s.HasTransportContact() { + if !s.TryReconnect() { + allConnected = false + } + } + } + return allConnected +} + // AllHaveTransportContact returns true only when every configured shipper has // established transport contact strong enough for bootstrap observability. // This is intentionally weaker than InSync: it allows the V2 core to observe diff --git a/weed/storage/blockvol/wal_shipper.go b/weed/storage/blockvol/wal_shipper.go index a540ff006..2878c9508 100644 --- a/weed/storage/blockvol/wal_shipper.go +++ b/weed/storage/blockvol/wal_shipper.go @@ -497,6 +497,53 @@ func (s *WALShipper) Stop() { s.ctrlMu.Unlock() } +// TryReconnect attempts the full reconnect protocol (dial + handshake + +// bounded catch-up if needed) without requiring a foreground write or barrier. +// Used by the host-side recheck when the V2 core reports +// awaiting_shipper_connected but no I/O is triggering Ship(). +// +// This is Option B from the design: trigger the same reconnect path that +// Barrier() would use, but proactively instead of waiting for I/O. +// Returns true if the shipper reached InSync or has transport contact. +func (s *WALShipper) TryReconnect() bool { + if s.stopped.Load() { + return false + } + st := s.State() + if st == ReplicaInSync { + return true + } + if st != ReplicaDisconnected && st != ReplicaDegraded { + return false + } + if s.wal == nil { + // No WAL access — try bare connection only. + s.mu.Lock() + err := s.ensureDataConn() + s.mu.Unlock() + if err != nil { + return false + } + s.lastContactTime.Store(time.Now()) + return s.HasTransportContact() + } + + // Full reconnect: handshake + bounded catch-up if needed. + // Use the primary's WAL head as the catch-up target. + _, headLSN := s.wal.RetainedRange() + if headLSN == 0 { + headLSN = 1 + } + log.Printf("wal_shipper: proactive reconnect (data=%s ctrl=%s state=%s target=%d)", + s.dataAddr, s.controlAddr, st, headLSN) + if _, err := s.CatchUpTo(headLSN); err != nil { + log.Printf("wal_shipper: proactive reconnect failed (data=%s ctrl=%s): %v", + s.dataAddr, s.controlAddr, err) + return s.HasTransportContact() + } + return s.State() == ReplicaInSync || s.HasTransportContact() +} + func (s *WALShipper) ensureDataConn() error { if s.dataConn != nil { return nil