refactor dispatcher

This commit is contained in:
Callum Waters
2021-08-24 18:35:39 +02:00
parent 45b5349317
commit d44d281d80
7 changed files with 161 additions and 293 deletions

View File

@@ -19,111 +19,44 @@ var (
errUnsolicitedResponse = errors.New("unsolicited light block response")
errNoResponse = errors.New("peer failed to respond within timeout")
errPeerAlreadyBusy = errors.New("peer is already processing a request")
errDisconnected = errors.New("dispatcher has been disconnected")
errDisconnected = errors.New("provider has been disconnected")
)
// dispatcher keeps a list of peers and allows concurrent requests for light
// blocks. NOTE: It is not the responsibility of the dispatcher to verify the
// light blocks.
type dispatcher struct {
// a pool of peers to send light block request too
availablePeers *peerlist
requestCh chan<- p2p.Envelope
// dispatcher multiplexes concurrent requests by multiple peers for light blocks.
// Only one request per peer can be sent at a time
// NOTE: It is not the responsibility of the dispatcher to verify the light blocks.
type Dispatcher struct {
// the channel with which to send light block requests on
requestCh chan<- p2p.Envelope
// timeout for light block delivery (immutable)
timeout time.Duration
mtx sync.Mutex
// the set of providers that the dispatcher is providing for (is distinct
// from available peers)
providers map[types.NodeID]struct{}
// all pending calls that have been dispatched and are awaiting an answer
calls map[types.NodeID]chan *types.LightBlock
// signals whether the underlying reactor is still running
running bool
}
func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispatcher {
return &dispatcher{
availablePeers: newPeerList(),
timeout: timeout,
requestCh: requestCh,
providers: make(map[types.NodeID]struct{}),
calls: make(map[types.NodeID]chan *types.LightBlock),
running: true,
func NewDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *Dispatcher {
return &Dispatcher{
timeout: timeout,
requestCh: requestCh,
calls: make(map[types.NodeID]chan *types.LightBlock),
running: true,
}
}
// LightBlock uses the request channel to fetch a light block from the next peer
// in a list, tracks the call and waits for the reactor to pass along the response
func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.LightBlock, types.NodeID, error) {
d.mtx.Lock()
// check that the dispatcher is connected to the reactor
if !d.running {
d.mtx.Unlock()
return nil, "", errDisconnected
}
// check to see that the dispatcher is connected to at least one peer
if d.availablePeers.Len() == 0 && len(d.calls) == 0 {
d.mtx.Unlock()
return nil, "", errNoConnectedPeers
}
d.mtx.Unlock()
// fetch the next peer id in the list and request a light block from that
// peer
peer := d.availablePeers.Pop(ctx)
lb, err := d.lightBlock(ctx, height, peer)
// append the peer back to the list
d.availablePeers.Append(peer)
return lb, peer, err
}
// Providers turns the dispatcher into a set of providers (per peer) which can
// be used by a light client
func (d *dispatcher) Providers(chainID string) []provider.Provider {
providers := make([]provider.Provider, d.availablePeers.Len())
for i := 0; i < cap(providers); i++ {
peer := d.availablePeers.Pop(context.Background())
providers[i] = d.CreateProvider(peer, chainID)
}
return providers
}
// Creates an individual provider from a peer id that the dispatcher is
// connected with.
func (d *dispatcher) CreateProvider(peer types.NodeID, chainID string) provider.Provider {
d.mtx.Lock()
defer d.mtx.Unlock()
d.availablePeers.Remove(peer)
d.providers[peer] = struct{}{}
return &blockProvider{
peer: peer,
dispatcher: d,
chainID: chainID,
timeout: d.timeout,
}
}
func (d *dispatcher) stop() {
d.mtx.Lock()
defer d.mtx.Unlock()
d.running = false
for peer, call := range d.calls {
delete(d.calls, peer)
close(call)
}
}
func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) {
func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) {
// dispatch the request to the peer
callCh, err := d.dispatch(peer, height)
if err != nil {
return nil, err
}
// clean up the call after a response is returned
defer func() {
d.mtx.Lock()
defer d.mtx.Unlock()
@@ -147,9 +80,41 @@ func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.No
}
}
// dispatch takes a peer and allocates it a channel so long as it's not already
// busy and the receiving channel is still running. It then dispatches the message
func (d *Dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.LightBlock, error) {
d.mtx.Lock()
defer d.mtx.Unlock()
ch := make(chan *types.LightBlock, 1)
// check if the dispatcher is running or not
if !d.running {
close(ch)
return ch, errDisconnected
}
// check if a request for the same peer has already been made
if _, ok := d.calls[peer]; ok {
close(ch)
return ch, errPeerAlreadyBusy
}
d.calls[peer] = ch
// send request
fmt.Printf("sending request dispatch, height %d peer %v\n", height, peer)
d.requestCh <- p2p.Envelope{
To: peer,
Message: &ssproto.LightBlockRequest{
Height: uint64(height),
},
}
fmt.Printf("sent request dispatch, height %d peer %v\n", height, peer)
return ch, nil
}
// respond allows the underlying process which receives requests on the
// requestCh to respond with the respective light block
func (d *dispatcher) respond(lb *proto.LightBlock, peer types.NodeID) error {
func (d *Dispatcher) Respond(lb *proto.LightBlock, peer types.NodeID) error {
fmt.Printf("trying to respond with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer)
d.mtx.Lock()
defer d.mtx.Unlock()
@@ -169,7 +134,6 @@ func (d *dispatcher) respond(lb *proto.LightBlock, peer types.NodeID) error {
block, err := types.LightBlockFromProto(lb)
if err != nil {
answerCh <- nil
return err
}
@@ -177,67 +141,14 @@ func (d *dispatcher) respond(lb *proto.LightBlock, peer types.NodeID) error {
return nil
}
// addPeer adds a peer to the dispatcher
func (d *dispatcher) addPeer(peer types.NodeID) {
d.availablePeers.Append(peer)
}
// removePeer removes a peer from the dispatcher
func (d *dispatcher) removePeer(peer types.NodeID) {
func (d *Dispatcher) Stop() {
d.mtx.Lock()
defer d.mtx.Unlock()
if call, ok := d.calls[peer]; ok {
call <- nil
close(call)
d.running = false
for peer, call := range d.calls {
delete(d.calls, peer)
} else {
d.availablePeers.Remove(peer)
close(call)
}
delete(d.providers, peer)
}
// peerCount returns the amount of peers that the dispatcher is connected with
func (d *dispatcher) peerCount() int {
return d.availablePeers.Len()
}
func (d *dispatcher) isConnected(peer types.NodeID) bool {
d.mtx.Lock()
defer d.mtx.Unlock()
_, ok := d.providers[peer]
return ok
}
// dispatch takes a peer and allocates it a channel so long as it's not already
// busy and the receiving channel is still running. It then dispatches the message
func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.LightBlock, error) {
d.mtx.Lock()
defer d.mtx.Unlock()
ch := make(chan *types.LightBlock, 1)
// check if the dispatcher is running or not
if !d.running {
close(ch)
return ch, errDisconnected
}
// this should happen only if we add the same peer twice (somehow)
if _, ok := d.calls[peer]; ok {
close(ch)
return ch, errPeerAlreadyBusy
}
d.calls[peer] = ch
// send request
fmt.Printf("sending request dispatch, height %d peer %v\n", height, peer)
d.requestCh <- p2p.Envelope{
To: peer,
Message: &ssproto.LightBlockRequest{
Height: uint64(height),
},
}
fmt.Printf("sent request dispatch, height %d peer %v\n", height, peer)
return ch, nil
}
//----------------------------------------------------------------
@@ -250,20 +161,23 @@ func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh
type blockProvider struct {
peer types.NodeID
chainID string
timeout time.Duration
dispatcher *dispatcher
dispatcher *Dispatcher
}
func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) {
// check if the underlying reactor is still connected with the peer
if !p.dispatcher.isConnected(p.peer) {
return nil, provider.ErrConnectionClosed
// Creates a block provider which implements the light client Provider interface.
func NewBlockProvider(peer types.NodeID, chainID string, dispatcher *Dispatcher) *blockProvider {
return &blockProvider{
peer: peer,
chainID: chainID,
dispatcher: dispatcher,
}
fmt.Println("fetching block for block provider")
}
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
lb, err := p.dispatcher.lightBlock(ctx, height, p.peer)
// LightBlock fetches a light block from the peer at a specified height returning either a light block
// or an appropriate error. Concurrently unsafe
func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) {
fmt.Println("fetching block for block provider")
lb, err := p.dispatcher.LightBlock(ctx, height, p.peer)
switch err {
case nil:
if lb == nil {
@@ -271,12 +185,22 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li
}
case context.DeadlineExceeded, context.Canceled:
return nil, err
case errPeerAlreadyBusy:
return nil, provider.ErrLightBlockNotFound
case errNoResponse:
return nil, provider.ErrNoResponse
default:
return nil, provider.ErrUnreliableProvider{Reason: err.Error()}
}
// check that the height requested is the same one returned
if lb.Height != height {
return nil, provider.ErrBadLightBlock{
Reason: fmt.Errorf("expected height %d, got height %d", height, lb.Height),
}
}
// perform basic validation
if err := lb.ValidateBasic(p.chainID); err != nil {
return nil, provider.ErrBadLightBlock{Reason: err}
}
@@ -299,26 +223,26 @@ func (p *blockProvider) String() string { return string(p.peer) }
// peerList is a rolling list of peers. This is used to distribute the load of
// retrieving blocks over all the peers the reactor is connected to
type peerlist struct {
type peerList struct {
mtx sync.Mutex
peers []types.NodeID
waiting []chan types.NodeID
}
func newPeerList() *peerlist {
return &peerlist{
func newPeerList() *peerList {
return &peerList{
peers: make([]types.NodeID, 0),
waiting: make([]chan types.NodeID, 0),
}
}
func (l *peerlist) Len() int {
func (l *peerList) Len() int {
l.mtx.Lock()
defer l.mtx.Unlock()
return len(l.peers)
}
func (l *peerlist) Pop(ctx context.Context) types.NodeID {
func (l *peerList) Pop(ctx context.Context) types.NodeID {
l.mtx.Lock()
if len(l.peers) == 0 {
// if we don't have any peers in the list we block until a peer is
@@ -342,7 +266,7 @@ func (l *peerlist) Pop(ctx context.Context) types.NodeID {
return peer
}
func (l *peerlist) Append(peer types.NodeID) {
func (l *peerList) Append(peer types.NodeID) {
l.mtx.Lock()
defer l.mtx.Unlock()
if len(l.waiting) > 0 {
@@ -355,7 +279,7 @@ func (l *peerlist) Append(peer types.NodeID) {
}
}
func (l *peerlist) Remove(peer types.NodeID) {
func (l *peerList) Remove(peer types.NodeID) {
l.mtx.Lock()
defer l.mtx.Unlock()
for i, p := range l.peers {
@@ -366,7 +290,7 @@ func (l *peerlist) Remove(peer types.NodeID) {
}
}
func (l *peerlist) Peers() []types.NodeID {
func (l *peerList) All() []types.NodeID {
l.mtx.Lock()
defer l.mtx.Unlock()
return l.peers

View File

@@ -17,6 +17,10 @@ import (
"github.com/tendermint/tendermint/types"
)
var (
peer, _ = types.NewNodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
)
func TestDispatcherBasic(t *testing.T) {
t.Cleanup(leaktest.Check(t))
@@ -24,14 +28,10 @@ func TestDispatcherBasic(t *testing.T) {
closeCh := make(chan struct{})
defer close(closeCh)
d := newDispatcher(ch, 1*time.Second)
d := NewDispatcher(ch, 1*time.Second)
go handleRequests(t, d, ch, closeCh)
peers := createPeerSet(5)
for _, peer := range peers {
d.addPeer(peer)
}
wg := sync.WaitGroup{}
// make a bunch of async requests and require that the correct responses are
@@ -40,49 +40,48 @@ func TestDispatcherBasic(t *testing.T) {
wg.Add(1)
go func(height int64) {
defer wg.Done()
lb, peer, err := d.LightBlock(context.Background(), height)
lb, err := d.LightBlock(context.Background(), height, peers[i])
require.NoError(t, err)
require.NotNil(t, lb)
require.Equal(t, lb.Height, height)
require.Contains(t, peers, peer)
}(int64(i))
}
wg.Wait()
// we should finish with as many peers as we started out with
assert.Equal(t, 5, d.peerCount())
// assert that all calls were responded to
assert.Empty(t, d.calls)
}
func TestDispatcherReturnsNoBlock(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
d := NewDispatcher(ch, 1*time.Second)
doneCh := make(chan struct{})
go func() {
<-ch
require.NoError(t, d.respond(nil, peerFromSet))
require.NoError(t, d.Respond(nil, peer))
close(doneCh)
}()
lb, peerResult, err := d.LightBlock(context.Background(), 1)
lb, err := d.LightBlock(context.Background(), 1, peer)
<-doneCh
require.Nil(t, lb)
require.Nil(t, err)
require.Equal(t, peerFromSet, peerResult)
}
func TestBlockProviderTimeOutWaitingOnLightBlock(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
p := d.CreateProvider(peerFromSet, "test-chain")
lb, err := p.LightBlock(context.Background(), 1)
d := NewDispatcher(ch, 1*time.Second)
closeCh := make(chan struct{})
defer close(closeCh)
go handleRequests(t, d, ch, closeCh)
provider := NewBlockProvider(peer, "my-chain", d)
lb, err := provider.LightBlock(context.Background(), 1)
require.NoError(t, err)
require.NotNil(t, lb)
}
@@ -90,72 +89,16 @@ func TestBlockProviderTimeOutWaitingOnLightBlock(t *testing.T) {
func TestDispatcherTimeOutWaitingOnLightBlock(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
d := NewDispatcher(ch, 1*time.Second)
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancelFunc()
lb, peerResult, err := d.LightBlock(ctx, 1)
lb, err := d.LightBlock(ctx, 1, peer)
require.Error(t, err)
require.Equal(t, context.DeadlineExceeded, err)
require.Nil(t, lb)
require.Equal(t, peerFromSet, peerResult)
}
func TestDispatcherErrorsWhenNoPeers(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
lb, peerResult, err := d.LightBlock(context.Background(), 1)
require.Nil(t, lb)
require.Empty(t, peerResult)
require.Equal(t, errNoConnectedPeers, err)
}
func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) {
t.Cleanup(leaktest.Check(t))
dispatcherRequestCh := make(chan p2p.Envelope, 100)
d := newDispatcher(dispatcherRequestCh, 1*time.Second)
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
ctx := context.Background()
wrapped, cancelFunc := context.WithCancel(ctx)
doneCh := make(chan struct{})
go func() {
lb, peerResult, err := d.LightBlock(wrapped, 1)
require.Nil(t, lb)
require.Equal(t, peerFromSet, peerResult)
require.Equal(t, context.Canceled, err)
// calls to dispatcher.Lightblock write into the dispatcher's requestCh.
// we read from the requestCh here to unblock the requestCh for future
// calls.
<-dispatcherRequestCh
close(doneCh)
}()
cancelFunc()
<-doneCh
go func() {
<-dispatcherRequestCh
lb := &types.LightBlock{}
asProto, err := lb.ToProto()
require.Nil(t, err)
err = d.respond(asProto, peerFromSet)
require.Nil(t, err)
}()
lb, peerResult, err := d.LightBlock(context.Background(), 1)
require.NotNil(t, lb)
require.Equal(t, peerFromSet, peerResult)
require.Nil(t, err)
}
func TestDispatcherProviders(t *testing.T) {
@@ -166,25 +109,22 @@ func TestDispatcherProviders(t *testing.T) {
closeCh := make(chan struct{})
defer close(closeCh)
d := newDispatcher(ch, 5*time.Second)
d := NewDispatcher(ch, 5*time.Second)
go handleRequests(t, d, ch, closeCh)
peers := createPeerSet(5)
for _, peer := range peers {
d.addPeer(peer)
providers := make([]*blockProvider, len(peers))
for idx, peer := range peers {
providers[idx] = NewBlockProvider(peer, chainID, d)
}
providers := d.Providers(chainID)
require.Len(t, providers, 5)
for i, p := range providers {
bp, ok := p.(*blockProvider)
require.True(t, ok)
assert.Equal(t, string(peers[i]), bp.String(), i)
assert.Equal(t, string(peers[i]), p.String(), i)
lb, err := p.LightBlock(context.Background(), 10)
assert.NoError(t, err)
assert.NotNil(t, lb)
}
require.Equal(t, 0, d.peerCount())
}
func TestPeerListBasic(t *testing.T) {
@@ -198,7 +138,7 @@ func TestPeerListBasic(t *testing.T) {
peerList.Append(peer)
}
for idx, peer := range peerList.Peers() {
for idx, peer := range peerList.All() {
assert.Equal(t, peer, peerSet[idx])
}
@@ -329,7 +269,7 @@ func TestPeerListRemove(t *testing.T) {
for _, peer := range peerSet {
peerList.Remove(peer)
for _, p := range peerList.Peers() {
for _, p := range peerList.All() {
require.NotEqual(t, p, peer)
}
numPeers--
@@ -339,7 +279,7 @@ func TestPeerListRemove(t *testing.T) {
// handleRequests is a helper function usually run in a separate go routine to
// imitate the expected responses of the reactor wired to the dispatcher
func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh chan struct{}) {
func handleRequests(t *testing.T, d *Dispatcher, ch chan p2p.Envelope, closeCh chan struct{}) {
t.Helper()
for {
select {
@@ -348,9 +288,9 @@ func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh c
peer := request.To
resp := mockLBResp(t, peer, int64(height), time.Now())
block, _ := resp.block.ToProto()
require.NoError(t, d.respond(block, resp.peer))
require.NoError(t, d.Respond(block, resp.peer))
case <-closeCh:
d.stop()
d.Stop()
return
}
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/light"
"github.com/tendermint/tendermint/light/provider"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
@@ -142,13 +143,14 @@ type Reactor struct {
paramsCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
peers *peerList
dispatcher *Dispatcher
// These will only be set when a state sync is in progress. It is used to feed
// received snapshots and chunks into the sync. And to fetch light blocks and
// consensus params for verification and building of tendermint state.
mtx tmsync.RWMutex
syncer *syncer
dispatcher *dispatcher
mtx tmsync.RWMutex
syncer *syncer
}
// NewReactor returns a reference to a new state sync reactor, which implements
@@ -179,6 +181,8 @@ func NewReactor(
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
peers: newPeerList(),
dispatcher: NewDispatcher(blockCh.Out, lightBlockResponseTimeout),
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
@@ -215,7 +219,7 @@ func (r *Reactor) OnStart() error {
// blocking until they all exit.
func (r *Reactor) OnStop() {
// tell the dispatcher to stop sending any more requests
r.dispatcher.stop()
r.dispatcher.Stop()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
@@ -241,11 +245,10 @@ func (r *Reactor) Sync(
initialHeight int64,
) (sm.State, error) {
r.mtx.Lock()
if r.syncer != nil || r.dispatcher != nil {
if r.syncer != nil {
r.mtx.Unlock()
return sm.State{}, errors.New("a state sync is already in progress")
}
r.dispatcher = newDispatcher(r.blockCh.Out, lightBlockResponseTimeout)
r.mtx.Unlock()
to := light.TrustOptions{
@@ -263,7 +266,13 @@ func (r *Reactor) Sync(
// state provider needs at least two connected peers to initialize
spLogger.Info("Generating P2P state provider")
r.waitForEnoughPeers(ctx, 2)
stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, r.dispatcher, to, r.paramsCh.Out, spLogger)
peers := r.peers.All()
providers := make([]provider.Provider, len(peers))
for idx, p := range peers {
providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
}
stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger)
if err != nil {
return sm.State{}, err
}
@@ -381,8 +390,9 @@ func (r *Reactor) backfill(
for {
select {
case height := <-queue.nextHeight():
r.Logger.Debug("fetching next block", "height", height)
lb, peer, err := r.dispatcher.LightBlock(ctxWithCancel, height)
peer := r.peers.Pop(ctx)
r.Logger.Debug("fetching next block", "height", height, "peer", peer)
lb, err := r.dispatcher.LightBlock(ctxWithCancel, height, peer)
if errors.Is(err, context.Canceled) {
return
}
@@ -404,7 +414,7 @@ func (r *Reactor) backfill(
queue.retry(height)
// As we are fetching blocks backwards, if this node doesn't have the block it likely doesn't
// have any prior ones, thus we remove it from the peer list.
r.dispatcher.removePeer(peer)
r.peers.Remove(peer)
continue
}
@@ -692,7 +702,7 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
case *ssproto.LightBlockResponse:
r.Logger.Info("received light block response")
if r.dispatcher != nil {
if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil {
if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil {
r.Logger.Error("error processing light block response", "err", err)
}
}
@@ -847,17 +857,13 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status {
case p2p.PeerStatusUp:
if r.dispatcher != nil {
r.dispatcher.addPeer(peerUpdate.NodeID)
}
r.peers.Append(peerUpdate.NodeID)
if r.syncer != nil {
r.syncer.AddPeer(peerUpdate.NodeID)
}
case p2p.PeerStatusDown:
if r.dispatcher != nil {
r.dispatcher.removePeer(peerUpdate.NodeID)
}
r.peers.Remove(peerUpdate.NodeID)
if r.syncer != nil {
r.syncer.RemovePeer(peerUpdate.NodeID)
}
@@ -960,7 +966,7 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) {
case <-ctx.Done():
return
case <-time.After(200 * time.Millisecond):
if r.dispatcher.peerCount() >= numPeers {
if r.peers.Len() >= numPeers {
return
}
}

View File

@@ -157,7 +157,7 @@ func setup(
)
// override the dispatcher with one with a shorter timeout
rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second)
rts.reactor.dispatcher = NewDispatcher(rts.blockChannel.Out, 1*time.Second)
rts.syncer = newSyncer(
*cfg,
@@ -389,7 +389,7 @@ func TestReactor_LightBlockResponse(t *testing.T) {
}
}
func TestReactor_Dispatcher(t *testing.T) {
func TestReactor_BlockProviders(t *testing.T) {
rts := setup(t, nil, nil, nil, 2)
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("aa"),
@@ -406,10 +406,13 @@ func TestReactor_Dispatcher(t *testing.T) {
chain := buildLightBlockChain(t, 1, 10, time.Now())
go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
dispatcher := rts.reactor.dispatcher
providers := dispatcher.Providers(factory.DefaultTestChainID)
require.Len(t, providers, 2)
require.Equal(t, 0, dispatcher.peerCount())
peers := rts.reactor.peers.All()
require.Len(t, peers, 2)
providers := make([]provider.Provider, len(peers))
for idx, peer := range peers {
providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher)
}
wg := sync.WaitGroup{}
@@ -436,7 +439,6 @@ func TestReactor_Dispatcher(t *testing.T) {
t.Fail()
case <-ctx.Done():
}
require.Equal(t, 0, dispatcher.peerCount())
}
@@ -464,7 +466,9 @@ func TestReactor_P2P_Provider(t *testing.T) {
// we now test the p2p state provider
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
lb, _, err := rts.reactor.dispatcher.LightBlock(ctx, 2)
peers := rts.reactor.peers.All()
require.Len(t, peers, 2)
lb, err := rts.reactor.dispatcher.LightBlock(ctx, 2, peers[0])
require.NoError(t, err)
to := light.TrustOptions{
Period: 24 * time.Hour,
@@ -472,7 +476,12 @@ func TestReactor_P2P_Provider(t *testing.T) {
Hash: lb.Hash(),
}
p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, rts.reactor.dispatcher,
providers := make([]provider.Provider, len(peers))
for idx, peer := range peers {
providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher)
}
p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, providers,
to, rts.reactor.paramsCh.Out, log.TestingLogger())
require.NoError(t, err)
// set the state provider else the test won't think we are state syncing

View File

@@ -201,7 +201,6 @@ func rpcClient(server string) (*rpchttp.HTTP, error) {
type stateProviderP2P struct {
tmsync.Mutex // light.Client is not concurrency-safe
lc *light.Client
dispatcher *dispatcher
initialHeight int64
paramsSendCh chan<- p2p.Envelope
paramsRecvCh chan types.ConsensusParams
@@ -213,12 +212,11 @@ func NewP2PStateProvider(
ctx context.Context,
chainID string,
initialHeight int64,
dispatcher *dispatcher,
providers []lightprovider.Provider,
trustOptions light.TrustOptions,
paramsSendCh chan<- p2p.Envelope,
logger log.Logger,
) (StateProvider, error) {
providers := dispatcher.Providers(chainID)
if len(providers) < 2 {
return nil, fmt.Errorf("at least 2 peers are required, got %d", len(providers))
}
@@ -234,7 +232,6 @@ func NewP2PStateProvider(
return &stateProviderP2P{
lc: lc,
initialHeight: initialHeight,
dispatcher: dispatcher,
paramsSendCh: paramsSendCh,
paramsRecvCh: make(chan types.ConsensusParams),
}, nil
@@ -336,10 +333,9 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State,
return state, nil
}
func (s *stateProviderP2P) addPeer(peer types.NodeID) {
func (s *stateProviderP2P) addProvider(p lightprovider.Provider) {
if len(s.lc.Witnesses()) < 6 {
provider := s.dispatcher.CreateProvider(peer, s.lc.ChainID())
s.lc.AddProvider(provider)
s.lc.AddProvider(p)
}
}

View File

@@ -137,9 +137,6 @@ func (s *syncer) AddPeer(peerID types.NodeID) {
To: peerID,
Message: &ssproto.SnapshotsRequest{},
}
if stateP2Pprovider, ok := s.stateProvider.(*stateProviderP2P); ok {
stateP2Pprovider.addPeer(peerID)
}
}
// RemovePeer removes a peer from the pool.

View File

@@ -406,10 +406,6 @@ func TestClientLargeBisectionVerification(t *testing.T) {
mockNode.AssertExpectations(t)
}
func TestHeightThree(t *testing.T) {
primary.LightBlock(context.Background(), 3)
}
func TestClientBisectionBetweenTrustedHeaders(t *testing.T) {
mockFullNode := mockNodeFromHeadersAndVals(headerSet, valSet)
c, err := light.NewClient(