p2p state provider

This commit is contained in:
Callum Waters
2021-08-12 06:04:06 +02:00
parent f0ccd0fcd1
commit aeb6058210
6 changed files with 67 additions and 52 deletions

View File

@@ -48,6 +48,7 @@ func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispat
requestCh: requestCh,
providers: make(map[types.NodeID]struct{}),
calls: make(map[types.NodeID]chan *types.LightBlock),
running: true,
}
}
@@ -69,8 +70,10 @@ func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.Light
// fetch the next peer id in the list and request a light block from that
// peer
peer := d.availablePeers.Pop(ctx)
defer d.release(peer)
lb, err := d.lightBlock(ctx, height, peer)
// append the peer back to the list
d.availablePeers.Append(peer)
return lb, peer, err
}
@@ -111,12 +114,6 @@ func (d *dispatcher) stop() {
}
}
func (d *dispatcher) start() {
d.mtx.Lock()
defer d.mtx.Unlock()
d.running = true
}
func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) {
// dispatch the request to the peer
callCh, err := d.dispatch(peer, height)
@@ -235,14 +232,6 @@ func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh
return ch, nil
}
// release appends the peer back to the list and deletes the allocated call so
// that a new call can be made to that peer
func (d *dispatcher) release(peer types.NodeID) {
d.mtx.Lock()
defer d.mtx.Unlock()
d.availablePeers.Append(peer)
}
//----------------------------------------------------------------
// blockProvider is a p2p based light provider which uses a dispatcher connected
@@ -266,12 +255,18 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
lb, err := p.dispatcher.lightBlock(ctx, height, p.peer)
if err != nil {
switch err {
case nil:
if lb == nil {
return nil, provider.ErrLightBlockNotFound
}
case context.DeadlineExceeded, context.Canceled:
return nil, err
case errNoResponse:
return nil, provider.ErrNoResponse
default:
return nil, provider.ErrUnreliableProvider{Reason: err.Error()}
}
if lb == nil {
return nil, provider.ErrLightBlockNotFound
}
if err := lb.ValidateBasic(p.chainID); err != nil {
return nil, provider.ErrBadLightBlock{Reason: err}

View File

@@ -25,7 +25,6 @@ func TestDispatcherBasic(t *testing.T) {
defer close(closeCh)
d := newDispatcher(ch, 1*time.Second)
d.start()
go handleRequests(t, d, ch, closeCh)
peers := createPeerSet(5)
@@ -58,7 +57,6 @@ func TestDispatcherReturnsNoBlock(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
d.start()
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
doneCh := make(chan struct{})
@@ -81,7 +79,6 @@ func TestDispatcherErrorsWhenNoPeers(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
d.start()
lb, peerResult, err := d.LightBlock(context.Background(), 1)
@@ -94,7 +91,6 @@ func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) {
t.Cleanup(leaktest.Check(t))
dispatcherRequestCh := make(chan p2p.Envelope, 100)
d := newDispatcher(dispatcherRequestCh, 1*time.Second)
d.start()
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
@@ -142,7 +138,6 @@ func TestDispatcherProviders(t *testing.T) {
defer close(closeCh)
d := newDispatcher(ch, 5*time.Second)
d.start()
go handleRequests(t, d, ch, closeCh)
peers := createPeerSet(5)

View File

@@ -69,6 +69,17 @@ var (
MaxSendBytes: 400,
},
},
ParamsChannel: {
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(ParamsChannel),
Priority: 2,
SendQueueCapacity: 10,
RecvMessageCapacity: paramMsgSize,
RecvBufferCapacity: 128,
MaxSendBytes: 400,
},
},
}
)
@@ -97,6 +108,9 @@ const (
// lightBlockMsgSize is the maximum size of a lightBlockResponseMessage
lightBlockMsgSize = int(1e7) // ~10MB
// paramMsgSize is the maximum size of a paramsResponseMessage
paramMsgSize = int(1e5) // ~100kb
// lightBlockResponseTimeout is how long the dispatcher waits for a peer to
// return a light block
lightBlockResponseTimeout = 30 * time.Second
@@ -129,12 +143,12 @@ type Reactor struct {
peerUpdates *p2p.PeerUpdates
closeCh chan struct{}
// These will only be set when a state sync is in progress. It is used to feed
// received snapshots and chunks into the sync. And to fetch light blocks and
// consensus params for verification and building of tendermint state.
mtx tmsync.RWMutex
syncer *syncer
dispatcher *dispatcher
// This will only be set when a state sync is in progress. It is used to feed
// received snapshots and chunks into the sync.
mtx tmsync.RWMutex
syncer *syncer
}
// NewReactor returns a reference to a new state sync reactor, which implements
@@ -194,8 +208,6 @@ func (r *Reactor) OnStart() error {
go r.processPeerUpdates()
r.dispatcher.start()
return nil
}
@@ -229,12 +241,12 @@ func (r *Reactor) Sync(
initialHeight int64,
) (sm.State, error) {
r.mtx.Lock()
if r.syncer != nil {
if r.syncer != nil || r.dispatcher != nil {
r.mtx.Unlock()
return sm.State{}, errors.New("a state sync is already in progress")
}
r.dispatcher = newDispatcher(r.blockCh.Out, lightBlockResponseTimeout)
r.mtx.Unlock()
to := light.TrustOptions{
Period: r.cfg.TrustPeriod,
@@ -263,6 +275,7 @@ func (r *Reactor) Sync(
}
}
r.mtx.Lock()
r.syncer = newSyncer(
r.cfg,
r.Logger,
@@ -274,6 +287,12 @@ func (r *Reactor) Sync(
r.tempDir,
)
r.mtx.Unlock()
defer func() {
r.mtx.Lock()
r.syncer = nil
r.dispatcher = nil
r.mtx.Unlock()
}()
requestSnapshotsHook := func() {
// request snapshots from all currently connected peers
@@ -288,10 +307,6 @@ func (r *Reactor) Sync(
return sm.State{}, err
}
r.mtx.Lock()
r.syncer = nil
r.mtx.Unlock()
err = r.stateStore.Bootstrap(state)
if err != nil {
return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err)
@@ -348,7 +363,7 @@ func (r *Reactor) backfill(
const sleepTime = 1 * time.Second
var (
lastValidatorSet *types.ValidatorSet
lastChangeHeight int64 = startHeight
lastChangeHeight = startHeight
)
queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, maxLightBlockRequestRetries)
@@ -527,7 +542,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error {
return nil
}
logger.Debug("received snapshot", "height", msg.Height, "format", msg.Format)
logger.Info("received snapshot", "height", msg.Height, "format", msg.Format)
_, err := r.syncer.AddSnapshot(envelope.From, &snapshot{
Height: msg.Height,
Format: msg.Format,
@@ -669,8 +684,11 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
}
case *ssproto.LightBlockResponse:
if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil {
r.Logger.Error("error processing light block response", "err", err)
r.Logger.Info("received light block response")
if r.dispatcher != nil {
if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil {
r.Logger.Error("error processing light block response", "err", err)
}
}
default:
@@ -815,22 +833,25 @@ func (r *Reactor) processCh(ch *p2p.Channel, chName string) {
// handle the PeerUpdate or if a panic is recovered.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.RLock()
defer r.mtx.RUnlock()
r.mtx.Lock()
defer r.mtx.Unlock()
switch peerUpdate.Status {
case p2p.PeerStatusUp:
if r.dispatcher != nil {
r.dispatcher.addPeer(peerUpdate.NodeID)
}
if r.syncer != nil {
r.syncer.AddPeer(peerUpdate.NodeID)
}
r.dispatcher.addPeer(peerUpdate.NodeID)
case p2p.PeerStatusDown:
if r.dispatcher != nil {
r.dispatcher.removePeer(peerUpdate.NodeID)
}
if r.syncer != nil {
r.syncer.RemovePeer(peerUpdate.NodeID)
}
r.dispatcher.removePeer(peerUpdate.NodeID)
}
r.Logger.Info("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
}

View File

@@ -472,7 +472,7 @@ func TestReactor_P2P_Provider(t *testing.T) {
Hash: lb.Hash(),
}
p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, rts.reactor.dispatcher,
p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, rts.reactor.dispatcher,
to, rts.reactor.paramsCh.Out, log.TestingLogger())
require.NoError(t, err)
// set the state provider else the test won't think we are state syncing
@@ -661,8 +661,9 @@ func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime ti
return chain
}
func mockLB(t *testing.T, height int64, time time.Time,
lastBlockID types.BlockID, currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) {
func mockLB(t *testing.T, height int64, time time.Time, lastBlockID types.BlockID,
currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator,
) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) {
header, err := factory.MakeHeader(&types.Header{
Height: height,
LastBlockID: lastBlockID,

View File

@@ -137,6 +137,9 @@ func (s *syncer) AddPeer(peerID types.NodeID) {
To: peerID,
Message: &ssproto.SnapshotsRequest{},
}
if stateP2Pprovider, ok := s.stateProvider.(*stateProviderP2P); ok {
stateP2Pprovider.addPeer(peerID)
}
}
// RemovePeer removes a peer from the pool.

View File

@@ -5,8 +5,8 @@ snapshot_interval = 3
[node.validator03]
snapshot_interval = 3
[node.validator04]
snapshot_interval =3
[node.validator05]
state_sync = true
start_at = 5
persistent_peers = ["validator01"]
[node.validator05]
snapshot_interval =3
persistent_peers = ["validator01", "validator02"]