mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-19 19:22:52 +00:00
Compare commits
18 Commits
master
...
wb/statesy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8b326810f5 | ||
|
|
3185eff8c6 | ||
|
|
51193fdb75 | ||
|
|
8a0cd32ecf | ||
|
|
6ccc2aef9e | ||
|
|
530814991f | ||
|
|
93719f4494 | ||
|
|
28eacd75c2 | ||
|
|
e28bf9b659 | ||
|
|
0d72e89ac4 | ||
|
|
3da97821a1 | ||
|
|
db44aeb62b | ||
|
|
a070c42777 | ||
|
|
f8cebbc5af | ||
|
|
022148613e | ||
|
|
becc6c3072 | ||
|
|
41384c6899 | ||
|
|
7679cc5015 |
@@ -280,6 +280,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
|
||||
}
|
||||
|
||||
if err := r.initStateProvider(ctx, r.chainID, r.initialHeight); err != nil {
|
||||
r.mtx.Unlock()
|
||||
return sm.State{}, err
|
||||
}
|
||||
|
||||
@@ -889,17 +890,20 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
|
||||
}
|
||||
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
if r.syncer == nil {
|
||||
r.mtx.Unlock()
|
||||
return
|
||||
}
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
switch peerUpdate.Status {
|
||||
case p2p.PeerStatusUp:
|
||||
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
|
||||
r.providers[peerUpdate.NodeID] = newProvider
|
||||
r.syncer.AddPeer(peerUpdate.NodeID)
|
||||
err := r.syncer.AddPeer(peerUpdate.NodeID)
|
||||
if err != nil {
|
||||
r.Logger.Error("error adding peer to syncer", "error", err)
|
||||
return
|
||||
}
|
||||
if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
|
||||
// we do this in a separate routine to not block whilst waiting for the light client to finish
|
||||
// whatever call it's currently executing
|
||||
|
||||
@@ -141,7 +141,17 @@ func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, err
|
||||
|
||||
// AddPeer adds a peer to the pool. For now we just keep it simple and send a
|
||||
// single request to discover snapshots, later we may want to do retries and stuff.
|
||||
func (s *syncer) AddPeer(peerID types.NodeID) {
|
||||
func (s *syncer) AddPeer(peerID types.NodeID) (err error) {
|
||||
defer func() {
|
||||
// TODO: remove panic recover once AddPeer can no longer accientally send on
|
||||
// closed channel.
|
||||
// This recover was added to protect against the p2p message being sent
|
||||
// to the snapshot channel after the snapshot channel was closed.
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("panic sending peer snapshot request: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
s.logger.Debug("Requesting snapshots from peer", "peer", peerID)
|
||||
|
||||
msg := p2p.Envelope{
|
||||
@@ -153,6 +163,7 @@ func (s *syncer) AddPeer(peerID types.NodeID) {
|
||||
case <-s.closeCh:
|
||||
case s.snapshotCh <- msg:
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// RemovePeer removes a peer from the pool.
|
||||
|
||||
@@ -77,12 +77,14 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
|
||||
// Adding a couple of peers should trigger snapshot discovery messages
|
||||
rts.syncer.AddPeer(peerAID)
|
||||
err = rts.syncer.AddPeer(peerAID)
|
||||
require.NoError(t, err)
|
||||
e := <-rts.snapshotOutCh
|
||||
require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
|
||||
require.Equal(t, peerAID, e.To)
|
||||
|
||||
rts.syncer.AddPeer(peerBID)
|
||||
err = rts.syncer.AddPeer(peerBID)
|
||||
require.NoError(t, err)
|
||||
e = <-rts.snapshotOutCh
|
||||
require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
|
||||
require.Equal(t, peerBID, e.To)
|
||||
|
||||
Reference in New Issue
Block a user