From aeb6058210ce11f6c146ebc5b391de23dd989937 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Thu, 12 Aug 2021 06:04:06 +0200 Subject: [PATCH] p2p state provider --- internal/statesync/dispatcher.go | 33 ++++++-------- internal/statesync/dispatcher_test.go | 5 --- internal/statesync/reactor.go | 65 ++++++++++++++++++--------- internal/statesync/reactor_test.go | 7 +-- internal/statesync/syncer.go | 3 ++ test/e2e/networks/simple.toml | 6 +-- 6 files changed, 67 insertions(+), 52 deletions(-) diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 74752a505..eff58c5f0 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -48,6 +48,7 @@ func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispat requestCh: requestCh, providers: make(map[types.NodeID]struct{}), calls: make(map[types.NodeID]chan *types.LightBlock), + running: true, } } @@ -69,8 +70,10 @@ func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.Light // fetch the next peer id in the list and request a light block from that // peer peer := d.availablePeers.Pop(ctx) - defer d.release(peer) lb, err := d.lightBlock(ctx, height, peer) + + // append the peer back to the list + d.availablePeers.Append(peer) return lb, peer, err } @@ -111,12 +114,6 @@ func (d *dispatcher) stop() { } } -func (d *dispatcher) start() { - d.mtx.Lock() - defer d.mtx.Unlock() - d.running = true -} - func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) { // dispatch the request to the peer callCh, err := d.dispatch(peer, height) @@ -235,14 +232,6 @@ func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh return ch, nil } -// release appends the peer back to the list and deletes the allocated call so -// that a new call can be made to that peer -func (d *dispatcher) release(peer types.NodeID) { - d.mtx.Lock() - defer d.mtx.Unlock() - d.availablePeers.Append(peer) -} - //---------------------------------------------------------------- // blockProvider is a p2p based light provider which uses a dispatcher connected @@ -266,12 +255,18 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li ctx, cancel := context.WithTimeout(ctx, p.timeout) defer cancel() lb, err := p.dispatcher.lightBlock(ctx, height, p.peer) - if err != nil { + switch err { + case nil: + if lb == nil { + return nil, provider.ErrLightBlockNotFound + } + case context.DeadlineExceeded, context.Canceled: + return nil, err + case errNoResponse: + return nil, provider.ErrNoResponse + default: return nil, provider.ErrUnreliableProvider{Reason: err.Error()} } - if lb == nil { - return nil, provider.ErrLightBlockNotFound - } if err := lb.ValidateBasic(p.chainID); err != nil { return nil, provider.ErrBadLightBlock{Reason: err} diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index 8da29018e..6ada43f57 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -25,7 +25,6 @@ func TestDispatcherBasic(t *testing.T) { defer close(closeCh) d := newDispatcher(ch, 1*time.Second) - d.start() go handleRequests(t, d, ch, closeCh) peers := createPeerSet(5) @@ -58,7 +57,6 @@ func TestDispatcherReturnsNoBlock(t *testing.T) { t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) d := newDispatcher(ch, 1*time.Second) - d.start() peerFromSet := createPeerSet(1)[0] d.addPeer(peerFromSet) doneCh := make(chan struct{}) @@ -81,7 +79,6 @@ func TestDispatcherErrorsWhenNoPeers(t *testing.T) { t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) d := newDispatcher(ch, 1*time.Second) - d.start() lb, peerResult, err := d.LightBlock(context.Background(), 1) @@ -94,7 +91,6 @@ func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) { t.Cleanup(leaktest.Check(t)) dispatcherRequestCh := make(chan p2p.Envelope, 100) d := newDispatcher(dispatcherRequestCh, 1*time.Second) - d.start() peerFromSet := createPeerSet(1)[0] d.addPeer(peerFromSet) @@ -142,7 +138,6 @@ func TestDispatcherProviders(t *testing.T) { defer close(closeCh) d := newDispatcher(ch, 5*time.Second) - d.start() go handleRequests(t, d, ch, closeCh) peers := createPeerSet(5) diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 906a7a17f..2a6f6fa03 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -69,6 +69,17 @@ var ( MaxSendBytes: 400, }, }, + ParamsChannel: { + MsgType: new(ssproto.Message), + Descriptor: &p2p.ChannelDescriptor{ + ID: byte(ParamsChannel), + Priority: 2, + SendQueueCapacity: 10, + RecvMessageCapacity: paramMsgSize, + RecvBufferCapacity: 128, + MaxSendBytes: 400, + }, + }, } ) @@ -97,6 +108,9 @@ const ( // lightBlockMsgSize is the maximum size of a lightBlockResponseMessage lightBlockMsgSize = int(1e7) // ~10MB + // paramMsgSize is the maximum size of a paramsResponseMessage + paramMsgSize = int(1e5) // ~100kb + // lightBlockResponseTimeout is how long the dispatcher waits for a peer to // return a light block lightBlockResponseTimeout = 30 * time.Second @@ -129,12 +143,12 @@ type Reactor struct { peerUpdates *p2p.PeerUpdates closeCh chan struct{} + // These will only be set when a state sync is in progress. It is used to feed + // received snapshots and chunks into the sync. And to fetch light blocks and + // consensus params for verification and building of tendermint state. + mtx tmsync.RWMutex + syncer *syncer dispatcher *dispatcher - - // This will only be set when a state sync is in progress. It is used to feed - // received snapshots and chunks into the sync. - mtx tmsync.RWMutex - syncer *syncer } // NewReactor returns a reference to a new state sync reactor, which implements @@ -194,8 +208,6 @@ func (r *Reactor) OnStart() error { go r.processPeerUpdates() - r.dispatcher.start() - return nil } @@ -229,12 +241,12 @@ func (r *Reactor) Sync( initialHeight int64, ) (sm.State, error) { r.mtx.Lock() - if r.syncer != nil { + if r.syncer != nil || r.dispatcher != nil { r.mtx.Unlock() return sm.State{}, errors.New("a state sync is already in progress") } - r.dispatcher = newDispatcher(r.blockCh.Out, lightBlockResponseTimeout) + r.mtx.Unlock() to := light.TrustOptions{ Period: r.cfg.TrustPeriod, @@ -263,6 +275,7 @@ func (r *Reactor) Sync( } } + r.mtx.Lock() r.syncer = newSyncer( r.cfg, r.Logger, @@ -274,6 +287,12 @@ func (r *Reactor) Sync( r.tempDir, ) r.mtx.Unlock() + defer func() { + r.mtx.Lock() + r.syncer = nil + r.dispatcher = nil + r.mtx.Unlock() + }() requestSnapshotsHook := func() { // request snapshots from all currently connected peers @@ -288,10 +307,6 @@ func (r *Reactor) Sync( return sm.State{}, err } - r.mtx.Lock() - r.syncer = nil - r.mtx.Unlock() - err = r.stateStore.Bootstrap(state) if err != nil { return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err) @@ -348,7 +363,7 @@ func (r *Reactor) backfill( const sleepTime = 1 * time.Second var ( lastValidatorSet *types.ValidatorSet - lastChangeHeight int64 = startHeight + lastChangeHeight = startHeight ) queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, maxLightBlockRequestRetries) @@ -527,7 +542,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error { return nil } - logger.Debug("received snapshot", "height", msg.Height, "format", msg.Format) + logger.Info("received snapshot", "height", msg.Height, "format", msg.Format) _, err := r.syncer.AddSnapshot(envelope.From, &snapshot{ Height: msg.Height, Format: msg.Format, @@ -669,8 +684,11 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error { } case *ssproto.LightBlockResponse: - if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil { - r.Logger.Error("error processing light block response", "err", err) + r.Logger.Info("received light block response") + if r.dispatcher != nil { + if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil { + r.Logger.Error("error processing light block response", "err", err) + } } default: @@ -815,22 +833,25 @@ func (r *Reactor) processCh(ch *p2p.Channel, chName string) { // handle the PeerUpdate or if a panic is recovered. func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { r.Logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) - - r.mtx.RLock() - defer r.mtx.RUnlock() + r.mtx.Lock() + defer r.mtx.Unlock() switch peerUpdate.Status { case p2p.PeerStatusUp: + if r.dispatcher != nil { + r.dispatcher.addPeer(peerUpdate.NodeID) + } if r.syncer != nil { r.syncer.AddPeer(peerUpdate.NodeID) } - r.dispatcher.addPeer(peerUpdate.NodeID) case p2p.PeerStatusDown: + if r.dispatcher != nil { + r.dispatcher.removePeer(peerUpdate.NodeID) + } if r.syncer != nil { r.syncer.RemovePeer(peerUpdate.NodeID) } - r.dispatcher.removePeer(peerUpdate.NodeID) } r.Logger.Info("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) } diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 36482872d..6a73449ae 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -472,7 +472,7 @@ func TestReactor_P2P_Provider(t *testing.T) { Hash: lb.Hash(), } - p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, rts.reactor.dispatcher, + p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, rts.reactor.dispatcher, to, rts.reactor.paramsCh.Out, log.TestingLogger()) require.NoError(t, err) // set the state provider else the test won't think we are state syncing @@ -661,8 +661,9 @@ func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime ti return chain } -func mockLB(t *testing.T, height int64, time time.Time, - lastBlockID types.BlockID, currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) { +func mockLB(t *testing.T, height int64, time time.Time, lastBlockID types.BlockID, + currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator, +) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) { header, err := factory.MakeHeader(&types.Header{ Height: height, LastBlockID: lastBlockID, diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 5dc8aeb8c..c1d0522be 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -137,6 +137,9 @@ func (s *syncer) AddPeer(peerID types.NodeID) { To: peerID, Message: &ssproto.SnapshotsRequest{}, } + if stateP2Pprovider, ok := s.stateProvider.(*stateProviderP2P); ok { + stateP2Pprovider.addPeer(peerID) + } } // RemovePeer removes a peer from the pool. diff --git a/test/e2e/networks/simple.toml b/test/e2e/networks/simple.toml index b47775b2b..e98514188 100644 --- a/test/e2e/networks/simple.toml +++ b/test/e2e/networks/simple.toml @@ -5,8 +5,8 @@ snapshot_interval = 3 [node.validator03] snapshot_interval = 3 [node.validator04] +snapshot_interval =3 +[node.validator05] state_sync = true start_at = 5 -persistent_peers = ["validator01"] -[node.validator05] -snapshot_interval =3 \ No newline at end of file +persistent_peers = ["validator01", "validator02"] \ No newline at end of file