mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-03 11:45:18 +00:00
(cherry picked from commit d36a5905a6)
Co-authored-by: Sam Kleinman <garen@tychoish.com>
This commit is contained in:
@@ -255,11 +255,16 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
|
||||
r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
|
||||
r.mtx.Unlock()
|
||||
|
||||
// Request snapshots from all currently connected peers
|
||||
r.Logger.Debug("Requesting snapshots from known peers")
|
||||
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
|
||||
hook := func() {
|
||||
r.Logger.Debug("Requesting snapshots from known peers")
|
||||
// Request snapshots from all currently connected peers
|
||||
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
|
||||
}
|
||||
|
||||
hook()
|
||||
|
||||
state, commit, err := r.syncer.SyncAny(discoveryTime, hook)
|
||||
|
||||
state, commit, err := r.syncer.SyncAny(discoveryTime)
|
||||
r.mtx.Lock()
|
||||
r.syncer = nil
|
||||
r.mtx.Unlock()
|
||||
|
||||
@@ -173,8 +173,8 @@ func (p *snapshotPool) Ranked() []*snapshot {
|
||||
defer p.Unlock()
|
||||
|
||||
candidates := make([]*snapshot, 0, len(p.snapshots))
|
||||
for _, snapshot := range p.snapshots {
|
||||
candidates = append(candidates, snapshot)
|
||||
for key := range p.snapshots {
|
||||
candidates = append(candidates, p.snapshots[key])
|
||||
}
|
||||
|
||||
sort.Slice(candidates, func(i, j int) bool {
|
||||
|
||||
@@ -24,6 +24,9 @@ const (
|
||||
chunkTimeout = 2 * time.Minute
|
||||
// requestTimeout is the timeout before rerequesting a chunk, possibly from a different peer.
|
||||
chunkRequestTimeout = 10 * time.Second
|
||||
// minimumDiscoveryTime is the lowest allowable time for a
|
||||
// SyncAny discovery time.
|
||||
minimumDiscoveryTime = 5 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -125,7 +128,11 @@ func (s *syncer) RemovePeer(peer p2p.Peer) {
|
||||
// SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
|
||||
// snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
|
||||
// which the caller must use to bootstrap the node.
|
||||
func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, error) {
|
||||
func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) {
|
||||
if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime {
|
||||
discoveryTime = 5 * minimumDiscoveryTime
|
||||
}
|
||||
|
||||
if discoveryTime > 0 {
|
||||
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
|
||||
time.Sleep(discoveryTime)
|
||||
@@ -148,6 +155,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit,
|
||||
if discoveryTime == 0 {
|
||||
return sm.State{}, nil, errNoSnapshots
|
||||
}
|
||||
retryHook()
|
||||
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
|
||||
time.Sleep(discoveryTime)
|
||||
continue
|
||||
|
||||
@@ -186,7 +186,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
LastBlockAppHash: []byte("app_hash"),
|
||||
}, nil)
|
||||
|
||||
newState, lastCommit, err := syncer.SyncAny(0)
|
||||
newState, lastCommit, err := syncer.SyncAny(0, func() {})
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // wait for peers to receive requests
|
||||
@@ -210,7 +210,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
|
||||
func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
|
||||
syncer, _ := setupOfferSyncer(t)
|
||||
_, _, err := syncer.SyncAny(0)
|
||||
_, _, err := syncer.SyncAny(0, func() {})
|
||||
assert.Equal(t, errNoSnapshots, err)
|
||||
}
|
||||
|
||||
@@ -224,7 +224,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) {
|
||||
Snapshot: toABCI(s), AppHash: []byte("app_hash"),
|
||||
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
|
||||
|
||||
_, _, err = syncer.SyncAny(0)
|
||||
_, _, err = syncer.SyncAny(0, func() {})
|
||||
assert.Equal(t, errAbort, err)
|
||||
connSnapshot.AssertExpectations(t)
|
||||
}
|
||||
@@ -255,7 +255,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) {
|
||||
Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
|
||||
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
|
||||
|
||||
_, _, err = syncer.SyncAny(0)
|
||||
_, _, err = syncer.SyncAny(0, func() {})
|
||||
assert.Equal(t, errNoSnapshots, err)
|
||||
connSnapshot.AssertExpectations(t)
|
||||
}
|
||||
@@ -282,7 +282,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) {
|
||||
Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
|
||||
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)
|
||||
|
||||
_, _, err = syncer.SyncAny(0)
|
||||
_, _, err = syncer.SyncAny(0, func() {})
|
||||
assert.Equal(t, errAbort, err)
|
||||
connSnapshot.AssertExpectations(t)
|
||||
}
|
||||
@@ -320,7 +320,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) {
|
||||
Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
|
||||
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)
|
||||
|
||||
_, _, err = syncer.SyncAny(0)
|
||||
_, _, err = syncer.SyncAny(0, func() {})
|
||||
assert.Equal(t, errNoSnapshots, err)
|
||||
connSnapshot.AssertExpectations(t)
|
||||
}
|
||||
@@ -336,7 +336,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) {
|
||||
Snapshot: toABCI(s), AppHash: []byte("app_hash"),
|
||||
}).Once().Return(nil, errBoom)
|
||||
|
||||
_, _, err = syncer.SyncAny(0)
|
||||
_, _, err = syncer.SyncAny(0, func() {})
|
||||
assert.True(t, errors.Is(err, errBoom))
|
||||
connSnapshot.AssertExpectations(t)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user