From d44d281d800b7b381c088069b8fe774c4acf4547 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 24 Aug 2021 18:35:39 +0200 Subject: [PATCH] refactor dispatcher --- internal/statesync/dispatcher.go | 242 +++++++++----------------- internal/statesync/dispatcher_test.go | 126 ++++---------- internal/statesync/reactor.go | 42 +++-- internal/statesync/reactor_test.go | 27 ++- internal/statesync/stateprovider.go | 10 +- internal/statesync/syncer.go | 3 - light/client_test.go | 4 - 7 files changed, 161 insertions(+), 293 deletions(-) diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index d70a8980b..1e94eabe5 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -19,111 +19,44 @@ var ( errUnsolicitedResponse = errors.New("unsolicited light block response") errNoResponse = errors.New("peer failed to respond within timeout") errPeerAlreadyBusy = errors.New("peer is already processing a request") - errDisconnected = errors.New("dispatcher has been disconnected") + errDisconnected = errors.New("provider has been disconnected") ) -// dispatcher keeps a list of peers and allows concurrent requests for light -// blocks. NOTE: It is not the responsibility of the dispatcher to verify the -// light blocks. -type dispatcher struct { - // a pool of peers to send light block request too - availablePeers *peerlist - requestCh chan<- p2p.Envelope +// dispatcher multiplexes concurrent requests by multiple peers for light blocks. +// Only one request per peer can be sent at a time +// NOTE: It is not the responsibility of the dispatcher to verify the light blocks. +type Dispatcher struct { + // the channel with which to send light block requests on + requestCh chan<- p2p.Envelope // timeout for light block delivery (immutable) timeout time.Duration mtx sync.Mutex - // the set of providers that the dispatcher is providing for (is distinct - // from available peers) - providers map[types.NodeID]struct{} // all pending calls that have been dispatched and are awaiting an answer calls map[types.NodeID]chan *types.LightBlock // signals whether the underlying reactor is still running running bool } -func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispatcher { - return &dispatcher{ - availablePeers: newPeerList(), - timeout: timeout, - requestCh: requestCh, - providers: make(map[types.NodeID]struct{}), - calls: make(map[types.NodeID]chan *types.LightBlock), - running: true, +func NewDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *Dispatcher { + return &Dispatcher{ + timeout: timeout, + requestCh: requestCh, + calls: make(map[types.NodeID]chan *types.LightBlock), + running: true, } } // LightBlock uses the request channel to fetch a light block from the next peer // in a list, tracks the call and waits for the reactor to pass along the response -func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.LightBlock, types.NodeID, error) { - d.mtx.Lock() - // check that the dispatcher is connected to the reactor - if !d.running { - d.mtx.Unlock() - return nil, "", errDisconnected - } - // check to see that the dispatcher is connected to at least one peer - if d.availablePeers.Len() == 0 && len(d.calls) == 0 { - d.mtx.Unlock() - return nil, "", errNoConnectedPeers - } - d.mtx.Unlock() - - // fetch the next peer id in the list and request a light block from that - // peer - peer := d.availablePeers.Pop(ctx) - - lb, err := d.lightBlock(ctx, height, peer) - - // append the peer back to the list - d.availablePeers.Append(peer) - return lb, peer, err -} - -// Providers turns the dispatcher into a set of providers (per peer) which can -// be used by a light client -func (d *dispatcher) Providers(chainID string) []provider.Provider { - providers := make([]provider.Provider, d.availablePeers.Len()) - for i := 0; i < cap(providers); i++ { - peer := d.availablePeers.Pop(context.Background()) - providers[i] = d.CreateProvider(peer, chainID) - } - return providers -} - -// Creates an individual provider from a peer id that the dispatcher is -// connected with. -func (d *dispatcher) CreateProvider(peer types.NodeID, chainID string) provider.Provider { - d.mtx.Lock() - defer d.mtx.Unlock() - - d.availablePeers.Remove(peer) - d.providers[peer] = struct{}{} - return &blockProvider{ - peer: peer, - dispatcher: d, - chainID: chainID, - timeout: d.timeout, - } -} - -func (d *dispatcher) stop() { - d.mtx.Lock() - defer d.mtx.Unlock() - d.running = false - for peer, call := range d.calls { - delete(d.calls, peer) - close(call) - } -} - -func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) { +func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) { // dispatch the request to the peer callCh, err := d.dispatch(peer, height) if err != nil { return nil, err } + // clean up the call after a response is returned defer func() { d.mtx.Lock() defer d.mtx.Unlock() @@ -147,9 +80,41 @@ func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.No } } +// dispatch takes a peer and allocates it a channel so long as it's not already +// busy and the receiving channel is still running. It then dispatches the message +func (d *Dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.LightBlock, error) { + d.mtx.Lock() + defer d.mtx.Unlock() + ch := make(chan *types.LightBlock, 1) + + // check if the dispatcher is running or not + if !d.running { + close(ch) + return ch, errDisconnected + } + + // check if a request for the same peer has already been made + if _, ok := d.calls[peer]; ok { + close(ch) + return ch, errPeerAlreadyBusy + } + d.calls[peer] = ch + + // send request + fmt.Printf("sending request dispatch, height %d peer %v\n", height, peer) + d.requestCh <- p2p.Envelope{ + To: peer, + Message: &ssproto.LightBlockRequest{ + Height: uint64(height), + }, + } + fmt.Printf("sent request dispatch, height %d peer %v\n", height, peer) + return ch, nil +} + // respond allows the underlying process which receives requests on the // requestCh to respond with the respective light block -func (d *dispatcher) respond(lb *proto.LightBlock, peer types.NodeID) error { +func (d *Dispatcher) Respond(lb *proto.LightBlock, peer types.NodeID) error { fmt.Printf("trying to respond with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer) d.mtx.Lock() defer d.mtx.Unlock() @@ -169,7 +134,6 @@ func (d *dispatcher) respond(lb *proto.LightBlock, peer types.NodeID) error { block, err := types.LightBlockFromProto(lb) if err != nil { - answerCh <- nil return err } @@ -177,67 +141,14 @@ func (d *dispatcher) respond(lb *proto.LightBlock, peer types.NodeID) error { return nil } -// addPeer adds a peer to the dispatcher -func (d *dispatcher) addPeer(peer types.NodeID) { - d.availablePeers.Append(peer) -} - -// removePeer removes a peer from the dispatcher -func (d *dispatcher) removePeer(peer types.NodeID) { +func (d *Dispatcher) Stop() { d.mtx.Lock() defer d.mtx.Unlock() - if call, ok := d.calls[peer]; ok { - call <- nil - close(call) + d.running = false + for peer, call := range d.calls { delete(d.calls, peer) - } else { - d.availablePeers.Remove(peer) + close(call) } - delete(d.providers, peer) -} - -// peerCount returns the amount of peers that the dispatcher is connected with -func (d *dispatcher) peerCount() int { - return d.availablePeers.Len() -} - -func (d *dispatcher) isConnected(peer types.NodeID) bool { - d.mtx.Lock() - defer d.mtx.Unlock() - _, ok := d.providers[peer] - return ok -} - -// dispatch takes a peer and allocates it a channel so long as it's not already -// busy and the receiving channel is still running. It then dispatches the message -func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.LightBlock, error) { - d.mtx.Lock() - defer d.mtx.Unlock() - ch := make(chan *types.LightBlock, 1) - - // check if the dispatcher is running or not - if !d.running { - close(ch) - return ch, errDisconnected - } - - // this should happen only if we add the same peer twice (somehow) - if _, ok := d.calls[peer]; ok { - close(ch) - return ch, errPeerAlreadyBusy - } - d.calls[peer] = ch - - // send request - fmt.Printf("sending request dispatch, height %d peer %v\n", height, peer) - d.requestCh <- p2p.Envelope{ - To: peer, - Message: &ssproto.LightBlockRequest{ - Height: uint64(height), - }, - } - fmt.Printf("sent request dispatch, height %d peer %v\n", height, peer) - return ch, nil } //---------------------------------------------------------------- @@ -250,20 +161,23 @@ func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh type blockProvider struct { peer types.NodeID chainID string - timeout time.Duration - dispatcher *dispatcher + dispatcher *Dispatcher } -func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) { - // check if the underlying reactor is still connected with the peer - if !p.dispatcher.isConnected(p.peer) { - return nil, provider.ErrConnectionClosed +// Creates a block provider which implements the light client Provider interface. +func NewBlockProvider(peer types.NodeID, chainID string, dispatcher *Dispatcher) *blockProvider { + return &blockProvider{ + peer: peer, + chainID: chainID, + dispatcher: dispatcher, } - fmt.Println("fetching block for block provider") +} - ctx, cancel := context.WithTimeout(ctx, p.timeout) - defer cancel() - lb, err := p.dispatcher.lightBlock(ctx, height, p.peer) +// LightBlock fetches a light block from the peer at a specified height returning either a light block +// or an appropriate error. Concurrently unsafe +func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) { + fmt.Println("fetching block for block provider") + lb, err := p.dispatcher.LightBlock(ctx, height, p.peer) switch err { case nil: if lb == nil { @@ -271,12 +185,22 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li } case context.DeadlineExceeded, context.Canceled: return nil, err + case errPeerAlreadyBusy: + return nil, provider.ErrLightBlockNotFound case errNoResponse: return nil, provider.ErrNoResponse default: return nil, provider.ErrUnreliableProvider{Reason: err.Error()} } + // check that the height requested is the same one returned + if lb.Height != height { + return nil, provider.ErrBadLightBlock{ + Reason: fmt.Errorf("expected height %d, got height %d", height, lb.Height), + } + } + + // perform basic validation if err := lb.ValidateBasic(p.chainID); err != nil { return nil, provider.ErrBadLightBlock{Reason: err} } @@ -299,26 +223,26 @@ func (p *blockProvider) String() string { return string(p.peer) } // peerList is a rolling list of peers. This is used to distribute the load of // retrieving blocks over all the peers the reactor is connected to -type peerlist struct { +type peerList struct { mtx sync.Mutex peers []types.NodeID waiting []chan types.NodeID } -func newPeerList() *peerlist { - return &peerlist{ +func newPeerList() *peerList { + return &peerList{ peers: make([]types.NodeID, 0), waiting: make([]chan types.NodeID, 0), } } -func (l *peerlist) Len() int { +func (l *peerList) Len() int { l.mtx.Lock() defer l.mtx.Unlock() return len(l.peers) } -func (l *peerlist) Pop(ctx context.Context) types.NodeID { +func (l *peerList) Pop(ctx context.Context) types.NodeID { l.mtx.Lock() if len(l.peers) == 0 { // if we don't have any peers in the list we block until a peer is @@ -342,7 +266,7 @@ func (l *peerlist) Pop(ctx context.Context) types.NodeID { return peer } -func (l *peerlist) Append(peer types.NodeID) { +func (l *peerList) Append(peer types.NodeID) { l.mtx.Lock() defer l.mtx.Unlock() if len(l.waiting) > 0 { @@ -355,7 +279,7 @@ func (l *peerlist) Append(peer types.NodeID) { } } -func (l *peerlist) Remove(peer types.NodeID) { +func (l *peerList) Remove(peer types.NodeID) { l.mtx.Lock() defer l.mtx.Unlock() for i, p := range l.peers { @@ -366,7 +290,7 @@ func (l *peerlist) Remove(peer types.NodeID) { } } -func (l *peerlist) Peers() []types.NodeID { +func (l *peerList) All() []types.NodeID { l.mtx.Lock() defer l.mtx.Unlock() return l.peers diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index e94341ce4..4072d9a28 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -17,6 +17,10 @@ import ( "github.com/tendermint/tendermint/types" ) +var ( + peer, _ = types.NewNodeID(strings.Repeat("a", 2*types.NodeIDByteLength)) +) + func TestDispatcherBasic(t *testing.T) { t.Cleanup(leaktest.Check(t)) @@ -24,14 +28,10 @@ func TestDispatcherBasic(t *testing.T) { closeCh := make(chan struct{}) defer close(closeCh) - d := newDispatcher(ch, 1*time.Second) + d := NewDispatcher(ch, 1*time.Second) go handleRequests(t, d, ch, closeCh) peers := createPeerSet(5) - for _, peer := range peers { - d.addPeer(peer) - } - wg := sync.WaitGroup{} // make a bunch of async requests and require that the correct responses are @@ -40,49 +40,48 @@ func TestDispatcherBasic(t *testing.T) { wg.Add(1) go func(height int64) { defer wg.Done() - lb, peer, err := d.LightBlock(context.Background(), height) + lb, err := d.LightBlock(context.Background(), height, peers[i]) require.NoError(t, err) require.NotNil(t, lb) require.Equal(t, lb.Height, height) - require.Contains(t, peers, peer) }(int64(i)) } wg.Wait() - // we should finish with as many peers as we started out with - assert.Equal(t, 5, d.peerCount()) + // assert that all calls were responded to + assert.Empty(t, d.calls) } func TestDispatcherReturnsNoBlock(t *testing.T) { t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) - d := newDispatcher(ch, 1*time.Second) - peerFromSet := createPeerSet(1)[0] - d.addPeer(peerFromSet) + d := NewDispatcher(ch, 1*time.Second) doneCh := make(chan struct{}) go func() { <-ch - require.NoError(t, d.respond(nil, peerFromSet)) + require.NoError(t, d.Respond(nil, peer)) close(doneCh) }() - lb, peerResult, err := d.LightBlock(context.Background(), 1) + lb, err := d.LightBlock(context.Background(), 1, peer) <-doneCh require.Nil(t, lb) require.Nil(t, err) - require.Equal(t, peerFromSet, peerResult) } func TestBlockProviderTimeOutWaitingOnLightBlock(t *testing.T) { t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) - d := newDispatcher(ch, 1*time.Second) - peerFromSet := createPeerSet(1)[0] - d.addPeer(peerFromSet) - p := d.CreateProvider(peerFromSet, "test-chain") - lb, err := p.LightBlock(context.Background(), 1) + d := NewDispatcher(ch, 1*time.Second) + + closeCh := make(chan struct{}) + defer close(closeCh) + go handleRequests(t, d, ch, closeCh) + + provider := NewBlockProvider(peer, "my-chain", d) + lb, err := provider.LightBlock(context.Background(), 1) require.NoError(t, err) require.NotNil(t, lb) } @@ -90,72 +89,16 @@ func TestBlockProviderTimeOutWaitingOnLightBlock(t *testing.T) { func TestDispatcherTimeOutWaitingOnLightBlock(t *testing.T) { t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) - d := newDispatcher(ch, 1*time.Second) - peerFromSet := createPeerSet(1)[0] - d.addPeer(peerFromSet) + d := NewDispatcher(ch, 1*time.Second) + ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancelFunc() - lb, peerResult, err := d.LightBlock(ctx, 1) + lb, err := d.LightBlock(ctx, 1, peer) require.Error(t, err) require.Equal(t, context.DeadlineExceeded, err) require.Nil(t, lb) - require.Equal(t, peerFromSet, peerResult) -} - -func TestDispatcherErrorsWhenNoPeers(t *testing.T) { - t.Cleanup(leaktest.Check(t)) - ch := make(chan p2p.Envelope, 100) - d := newDispatcher(ch, 1*time.Second) - - lb, peerResult, err := d.LightBlock(context.Background(), 1) - - require.Nil(t, lb) - require.Empty(t, peerResult) - require.Equal(t, errNoConnectedPeers, err) -} - -func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) { - t.Cleanup(leaktest.Check(t)) - dispatcherRequestCh := make(chan p2p.Envelope, 100) - d := newDispatcher(dispatcherRequestCh, 1*time.Second) - - peerFromSet := createPeerSet(1)[0] - d.addPeer(peerFromSet) - ctx := context.Background() - wrapped, cancelFunc := context.WithCancel(ctx) - - doneCh := make(chan struct{}) - go func() { - lb, peerResult, err := d.LightBlock(wrapped, 1) - require.Nil(t, lb) - require.Equal(t, peerFromSet, peerResult) - require.Equal(t, context.Canceled, err) - - // calls to dispatcher.Lightblock write into the dispatcher's requestCh. - // we read from the requestCh here to unblock the requestCh for future - // calls. - <-dispatcherRequestCh - close(doneCh) - }() - cancelFunc() - <-doneCh - - go func() { - <-dispatcherRequestCh - lb := &types.LightBlock{} - asProto, err := lb.ToProto() - require.Nil(t, err) - err = d.respond(asProto, peerFromSet) - require.Nil(t, err) - }() - - lb, peerResult, err := d.LightBlock(context.Background(), 1) - - require.NotNil(t, lb) - require.Equal(t, peerFromSet, peerResult) - require.Nil(t, err) } func TestDispatcherProviders(t *testing.T) { @@ -166,25 +109,22 @@ func TestDispatcherProviders(t *testing.T) { closeCh := make(chan struct{}) defer close(closeCh) - d := newDispatcher(ch, 5*time.Second) + d := NewDispatcher(ch, 5*time.Second) go handleRequests(t, d, ch, closeCh) peers := createPeerSet(5) - for _, peer := range peers { - d.addPeer(peer) + providers := make([]*blockProvider, len(peers)) + for idx, peer := range peers { + providers[idx] = NewBlockProvider(peer, chainID, d) } - - providers := d.Providers(chainID) require.Len(t, providers, 5) + for i, p := range providers { - bp, ok := p.(*blockProvider) - require.True(t, ok) - assert.Equal(t, string(peers[i]), bp.String(), i) + assert.Equal(t, string(peers[i]), p.String(), i) lb, err := p.LightBlock(context.Background(), 10) assert.NoError(t, err) assert.NotNil(t, lb) } - require.Equal(t, 0, d.peerCount()) } func TestPeerListBasic(t *testing.T) { @@ -198,7 +138,7 @@ func TestPeerListBasic(t *testing.T) { peerList.Append(peer) } - for idx, peer := range peerList.Peers() { + for idx, peer := range peerList.All() { assert.Equal(t, peer, peerSet[idx]) } @@ -329,7 +269,7 @@ func TestPeerListRemove(t *testing.T) { for _, peer := range peerSet { peerList.Remove(peer) - for _, p := range peerList.Peers() { + for _, p := range peerList.All() { require.NotEqual(t, p, peer) } numPeers-- @@ -339,7 +279,7 @@ func TestPeerListRemove(t *testing.T) { // handleRequests is a helper function usually run in a separate go routine to // imitate the expected responses of the reactor wired to the dispatcher -func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh chan struct{}) { +func handleRequests(t *testing.T, d *Dispatcher, ch chan p2p.Envelope, closeCh chan struct{}) { t.Helper() for { select { @@ -348,9 +288,9 @@ func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh c peer := request.To resp := mockLBResp(t, peer, int64(height), time.Now()) block, _ := resp.block.ToProto() - require.NoError(t, d.respond(block, resp.peer)) + require.NoError(t, d.Respond(block, resp.peer)) case <-closeCh: - d.stop() + d.Stop() return } } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 38e4b5c3a..8157d5934 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/light" + "github.com/tendermint/tendermint/light/provider" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -142,13 +143,14 @@ type Reactor struct { paramsCh *p2p.Channel peerUpdates *p2p.PeerUpdates closeCh chan struct{} + peers *peerList + dispatcher *Dispatcher // These will only be set when a state sync is in progress. It is used to feed // received snapshots and chunks into the sync. And to fetch light blocks and // consensus params for verification and building of tendermint state. - mtx tmsync.RWMutex - syncer *syncer - dispatcher *dispatcher + mtx tmsync.RWMutex + syncer *syncer } // NewReactor returns a reference to a new state sync reactor, which implements @@ -179,6 +181,8 @@ func NewReactor( tempDir: tempDir, stateStore: stateStore, blockStore: blockStore, + peers: newPeerList(), + dispatcher: NewDispatcher(blockCh.Out, lightBlockResponseTimeout), } r.BaseService = *service.NewBaseService(logger, "StateSync", r) @@ -215,7 +219,7 @@ func (r *Reactor) OnStart() error { // blocking until they all exit. func (r *Reactor) OnStop() { // tell the dispatcher to stop sending any more requests - r.dispatcher.stop() + r.dispatcher.Stop() // Close closeCh to signal to all spawned goroutines to gracefully exit. All // p2p Channels should execute Close(). @@ -241,11 +245,10 @@ func (r *Reactor) Sync( initialHeight int64, ) (sm.State, error) { r.mtx.Lock() - if r.syncer != nil || r.dispatcher != nil { + if r.syncer != nil { r.mtx.Unlock() return sm.State{}, errors.New("a state sync is already in progress") } - r.dispatcher = newDispatcher(r.blockCh.Out, lightBlockResponseTimeout) r.mtx.Unlock() to := light.TrustOptions{ @@ -263,7 +266,13 @@ func (r *Reactor) Sync( // state provider needs at least two connected peers to initialize spLogger.Info("Generating P2P state provider") r.waitForEnoughPeers(ctx, 2) - stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, r.dispatcher, to, r.paramsCh.Out, spLogger) + peers := r.peers.All() + providers := make([]provider.Provider, len(peers)) + for idx, p := range peers { + providers[idx] = NewBlockProvider(p, chainID, r.dispatcher) + } + + stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger) if err != nil { return sm.State{}, err } @@ -381,8 +390,9 @@ func (r *Reactor) backfill( for { select { case height := <-queue.nextHeight(): - r.Logger.Debug("fetching next block", "height", height) - lb, peer, err := r.dispatcher.LightBlock(ctxWithCancel, height) + peer := r.peers.Pop(ctx) + r.Logger.Debug("fetching next block", "height", height, "peer", peer) + lb, err := r.dispatcher.LightBlock(ctxWithCancel, height, peer) if errors.Is(err, context.Canceled) { return } @@ -404,7 +414,7 @@ func (r *Reactor) backfill( queue.retry(height) // As we are fetching blocks backwards, if this node doesn't have the block it likely doesn't // have any prior ones, thus we remove it from the peer list. - r.dispatcher.removePeer(peer) + r.peers.Remove(peer) continue } @@ -692,7 +702,7 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error { case *ssproto.LightBlockResponse: r.Logger.Info("received light block response") if r.dispatcher != nil { - if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil { + if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil { r.Logger.Error("error processing light block response", "err", err) } } @@ -847,17 +857,13 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { switch peerUpdate.Status { case p2p.PeerStatusUp: - if r.dispatcher != nil { - r.dispatcher.addPeer(peerUpdate.NodeID) - } + r.peers.Append(peerUpdate.NodeID) if r.syncer != nil { r.syncer.AddPeer(peerUpdate.NodeID) } case p2p.PeerStatusDown: - if r.dispatcher != nil { - r.dispatcher.removePeer(peerUpdate.NodeID) - } + r.peers.Remove(peerUpdate.NodeID) if r.syncer != nil { r.syncer.RemovePeer(peerUpdate.NodeID) } @@ -960,7 +966,7 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) { case <-ctx.Done(): return case <-time.After(200 * time.Millisecond): - if r.dispatcher.peerCount() >= numPeers { + if r.peers.Len() >= numPeers { return } } diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 6a73449ae..357be2dda 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -157,7 +157,7 @@ func setup( ) // override the dispatcher with one with a shorter timeout - rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second) + rts.reactor.dispatcher = NewDispatcher(rts.blockChannel.Out, 1*time.Second) rts.syncer = newSyncer( *cfg, @@ -389,7 +389,7 @@ func TestReactor_LightBlockResponse(t *testing.T) { } } -func TestReactor_Dispatcher(t *testing.T) { +func TestReactor_BlockProviders(t *testing.T) { rts := setup(t, nil, nil, nil, 2) rts.peerUpdateCh <- p2p.PeerUpdate{ NodeID: types.NodeID("aa"), @@ -406,10 +406,13 @@ func TestReactor_Dispatcher(t *testing.T) { chain := buildLightBlockChain(t, 1, 10, time.Now()) go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0) - dispatcher := rts.reactor.dispatcher - providers := dispatcher.Providers(factory.DefaultTestChainID) - require.Len(t, providers, 2) - require.Equal(t, 0, dispatcher.peerCount()) + peers := rts.reactor.peers.All() + require.Len(t, peers, 2) + + providers := make([]provider.Provider, len(peers)) + for idx, peer := range peers { + providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher) + } wg := sync.WaitGroup{} @@ -436,7 +439,6 @@ func TestReactor_Dispatcher(t *testing.T) { t.Fail() case <-ctx.Done(): } - require.Equal(t, 0, dispatcher.peerCount()) } @@ -464,7 +466,9 @@ func TestReactor_P2P_Provider(t *testing.T) { // we now test the p2p state provider ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - lb, _, err := rts.reactor.dispatcher.LightBlock(ctx, 2) + peers := rts.reactor.peers.All() + require.Len(t, peers, 2) + lb, err := rts.reactor.dispatcher.LightBlock(ctx, 2, peers[0]) require.NoError(t, err) to := light.TrustOptions{ Period: 24 * time.Hour, @@ -472,7 +476,12 @@ func TestReactor_P2P_Provider(t *testing.T) { Hash: lb.Hash(), } - p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, rts.reactor.dispatcher, + providers := make([]provider.Provider, len(peers)) + for idx, peer := range peers { + providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher) + } + + p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, providers, to, rts.reactor.paramsCh.Out, log.TestingLogger()) require.NoError(t, err) // set the state provider else the test won't think we are state syncing diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index 7870b4e02..8c6233f13 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -201,7 +201,6 @@ func rpcClient(server string) (*rpchttp.HTTP, error) { type stateProviderP2P struct { tmsync.Mutex // light.Client is not concurrency-safe lc *light.Client - dispatcher *dispatcher initialHeight int64 paramsSendCh chan<- p2p.Envelope paramsRecvCh chan types.ConsensusParams @@ -213,12 +212,11 @@ func NewP2PStateProvider( ctx context.Context, chainID string, initialHeight int64, - dispatcher *dispatcher, + providers []lightprovider.Provider, trustOptions light.TrustOptions, paramsSendCh chan<- p2p.Envelope, logger log.Logger, ) (StateProvider, error) { - providers := dispatcher.Providers(chainID) if len(providers) < 2 { return nil, fmt.Errorf("at least 2 peers are required, got %d", len(providers)) } @@ -234,7 +232,6 @@ func NewP2PStateProvider( return &stateProviderP2P{ lc: lc, initialHeight: initialHeight, - dispatcher: dispatcher, paramsSendCh: paramsSendCh, paramsRecvCh: make(chan types.ConsensusParams), }, nil @@ -336,10 +333,9 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State, return state, nil } -func (s *stateProviderP2P) addPeer(peer types.NodeID) { +func (s *stateProviderP2P) addProvider(p lightprovider.Provider) { if len(s.lc.Witnesses()) < 6 { - provider := s.dispatcher.CreateProvider(peer, s.lc.ChainID()) - s.lc.AddProvider(provider) + s.lc.AddProvider(p) } } diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 6062e1617..24a50bfdb 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -137,9 +137,6 @@ func (s *syncer) AddPeer(peerID types.NodeID) { To: peerID, Message: &ssproto.SnapshotsRequest{}, } - if stateP2Pprovider, ok := s.stateProvider.(*stateProviderP2P); ok { - stateP2Pprovider.addPeer(peerID) - } } // RemovePeer removes a peer from the pool. diff --git a/light/client_test.go b/light/client_test.go index e5078eda9..e8a478a53 100644 --- a/light/client_test.go +++ b/light/client_test.go @@ -406,10 +406,6 @@ func TestClientLargeBisectionVerification(t *testing.T) { mockNode.AssertExpectations(t) } -func TestHeightThree(t *testing.T) { - primary.LightBlock(context.Background(), 3) -} - func TestClientBisectionBetweenTrustedHeaders(t *testing.T) { mockFullNode := mockNodeFromHeadersAndVals(headerSet, valSet) c, err := light.NewClient(