mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-10 14:00:33 +00:00
add tests / modify providers setup
This commit is contained in:
@@ -115,10 +115,13 @@ func (d *Dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh
|
||||
// respond allows the underlying process which receives requests on the
|
||||
// requestCh to respond with the respective light block
|
||||
func (d *Dispatcher) Respond(lb *proto.LightBlock, peer types.NodeID) error {
|
||||
fmt.Printf("trying to respond with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer)
|
||||
if lb != nil {
|
||||
fmt.Printf("trying to respond with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer)
|
||||
} else {
|
||||
fmt.Println("responded with empty block")
|
||||
}
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
fmt.Printf("responding with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer)
|
||||
|
||||
// check that the response came from a request
|
||||
answerCh, ok := d.calls[peer]
|
||||
@@ -189,7 +192,7 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li
|
||||
return nil, provider.ErrLightBlockNotFound
|
||||
case errNoResponse:
|
||||
return nil, provider.ErrNoResponse
|
||||
default:
|
||||
default: // errDisconnected
|
||||
return nil, provider.ErrUnreliableProvider{Reason: err.Error()}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ var (
|
||||
|
||||
func TestDispatcherBasic(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
numPeers := 5
|
||||
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
closeCh := make(chan struct{})
|
||||
@@ -31,20 +32,20 @@ func TestDispatcherBasic(t *testing.T) {
|
||||
d := NewDispatcher(ch, 1*time.Second)
|
||||
go handleRequests(t, d, ch, closeCh)
|
||||
|
||||
peers := createPeerSet(5)
|
||||
peers := createPeerSet(numPeers)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
// make a bunch of async requests and require that the correct responses are
|
||||
// given
|
||||
for i := 1; i < 10; i++ {
|
||||
for i := 0; i < numPeers; i++ {
|
||||
wg.Add(1)
|
||||
go func(height int64) {
|
||||
defer wg.Done()
|
||||
lb, err := d.LightBlock(context.Background(), height, peers[i])
|
||||
lb, err := d.LightBlock(context.Background(), height, peers[height - 1])
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, lb)
|
||||
require.Equal(t, lb.Height, height)
|
||||
}(int64(i))
|
||||
}(int64(i + 1))
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
@@ -71,21 +72,6 @@ func TestDispatcherReturnsNoBlock(t *testing.T) {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestBlockProviderTimeOutWaitingOnLightBlock(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
d := NewDispatcher(ch, 1*time.Second)
|
||||
|
||||
closeCh := make(chan struct{})
|
||||
defer close(closeCh)
|
||||
go handleRequests(t, d, ch, closeCh)
|
||||
|
||||
provider := NewBlockProvider(peer, "my-chain", d)
|
||||
lb, err := provider.LightBlock(context.Background(), 1)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, lb)
|
||||
}
|
||||
|
||||
func TestDispatcherTimeOutWaitingOnLightBlock(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
@@ -101,6 +87,21 @@ func TestDispatcherTimeOutWaitingOnLightBlock(t *testing.T) {
|
||||
require.Nil(t, lb)
|
||||
}
|
||||
|
||||
func TestDispatcherTimeOutWaitingOnLightBlock2(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
d := NewDispatcher(ch, 10*time.Millisecond)
|
||||
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancelFunc()
|
||||
|
||||
lb, err := d.LightBlock(ctx, 1, peer)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errNoResponse, err)
|
||||
require.Nil(t, lb)
|
||||
}
|
||||
|
||||
func TestDispatcherProviders(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
|
||||
@@ -127,6 +128,15 @@ func TestDispatcherProviders(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcherStopped(t *testing.T) {
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
d := NewDispatcher(ch, 5*time.Second)
|
||||
d.Stop()
|
||||
|
||||
_, err := d.LightBlock(context.Background(), 1, peer)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestPeerListBasic(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
peerList := newPeerList()
|
||||
|
||||
@@ -130,6 +130,8 @@ const (
|
||||
type Reactor struct {
|
||||
service.BaseService
|
||||
|
||||
chainID string
|
||||
initialHeight int64
|
||||
cfg config.StateSyncConfig
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore
|
||||
@@ -143,14 +145,19 @@ type Reactor struct {
|
||||
paramsCh *p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
closeCh chan struct{}
|
||||
peers *peerList
|
||||
dispatcher *Dispatcher
|
||||
|
||||
// Dispatcher is used to mutex light block requests and responses over multiple
|
||||
// block providers used by the p2p state provider and in reverse sync.
|
||||
dispatcher *Dispatcher
|
||||
peers *peerList
|
||||
|
||||
// These will only be set when a state sync is in progress. It is used to feed
|
||||
// received snapshots and chunks into the sync. And to fetch light blocks and
|
||||
// consensus params for verification and building of tendermint state.
|
||||
mtx tmsync.RWMutex
|
||||
// received snapshots and chunks into the syncer and manage incoming and outgoing
|
||||
// providers.
|
||||
mtx tmsync.RWMutex
|
||||
syncer *syncer
|
||||
providers map[types.NodeID]*blockProvider
|
||||
stateProvider StateProvider
|
||||
}
|
||||
|
||||
// NewReactor returns a reference to a new state sync reactor, which implements
|
||||
@@ -158,6 +165,8 @@ type Reactor struct {
|
||||
// and querying, references to p2p Channels and a channel to listen for peer
|
||||
// updates on. Note, the reactor will close all p2p Channels when stopping.
|
||||
func NewReactor(
|
||||
chainID string,
|
||||
initialHeight int64,
|
||||
cfg config.StateSyncConfig,
|
||||
logger log.Logger,
|
||||
conn proxy.AppConnSnapshot,
|
||||
@@ -169,6 +178,7 @@ func NewReactor(
|
||||
tempDir string,
|
||||
) *Reactor {
|
||||
r := &Reactor{
|
||||
chainID: chainID,
|
||||
cfg: cfg,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
@@ -183,6 +193,7 @@ func NewReactor(
|
||||
blockStore: blockStore,
|
||||
peers: newPeerList(),
|
||||
dispatcher: NewDispatcher(blockCh.Out, lightBlockResponseTimeout),
|
||||
providers: make(map[types.NodeID]*blockProvider),
|
||||
}
|
||||
|
||||
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
|
||||
@@ -244,53 +255,21 @@ func (r *Reactor) Sync(
|
||||
chainID string,
|
||||
initialHeight int64,
|
||||
) (sm.State, error) {
|
||||
r.waitForEnoughPeers(ctx, 3)
|
||||
r.mtx.Lock()
|
||||
if r.syncer != nil {
|
||||
r.mtx.Unlock()
|
||||
return sm.State{}, errors.New("a state sync is already in progress")
|
||||
}
|
||||
r.mtx.Unlock()
|
||||
|
||||
to := light.TrustOptions{
|
||||
Period: r.cfg.TrustPeriod,
|
||||
Height: r.cfg.TrustHeight,
|
||||
Hash: r.cfg.TrustHashBytes(),
|
||||
}
|
||||
spLogger := r.Logger.With("module", "stateprovider")
|
||||
r.initStateProvider(ctx, chainID, initialHeight)
|
||||
|
||||
var (
|
||||
stateProvider StateProvider
|
||||
err error
|
||||
)
|
||||
if r.cfg.UseP2P {
|
||||
// state provider needs at least two connected peers to initialize
|
||||
spLogger.Info("Generating P2P state provider")
|
||||
r.waitForEnoughPeers(ctx, 2)
|
||||
peers := r.peers.All()
|
||||
providers := make([]provider.Provider, len(peers))
|
||||
for idx, p := range peers {
|
||||
providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
|
||||
}
|
||||
|
||||
stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger)
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
spLogger.Info("Finished generating P2P state provider")
|
||||
} else {
|
||||
stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
}
|
||||
|
||||
r.mtx.Lock()
|
||||
r.syncer = newSyncer(
|
||||
r.cfg,
|
||||
r.Logger,
|
||||
r.conn,
|
||||
r.connQuery,
|
||||
stateProvider,
|
||||
r.stateProvider,
|
||||
r.snapshotCh.Out,
|
||||
r.chunkCh.Out,
|
||||
r.tempDir,
|
||||
@@ -298,8 +277,9 @@ func (r *Reactor) Sync(
|
||||
r.mtx.Unlock()
|
||||
defer func() {
|
||||
r.mtx.Lock()
|
||||
// reset syncing objects at the close of Sync
|
||||
r.syncer = nil
|
||||
r.dispatcher = nil
|
||||
r.stateProvider = nil
|
||||
r.mtx.Unlock()
|
||||
}()
|
||||
|
||||
@@ -311,7 +291,6 @@ func (r *Reactor) Sync(
|
||||
}
|
||||
}
|
||||
|
||||
r.Logger.Info("sync any starting")
|
||||
state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook)
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
@@ -393,6 +372,7 @@ func (r *Reactor) backfill(
|
||||
peer := r.peers.Pop(ctx)
|
||||
r.Logger.Debug("fetching next block", "height", height, "peer", peer)
|
||||
lb, err := r.dispatcher.LightBlock(ctxWithCancel, height, peer)
|
||||
r.peers.Append(peer)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
@@ -571,9 +551,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error {
|
||||
)
|
||||
return nil
|
||||
}
|
||||
if msg.Height == 3 {
|
||||
fmt.Println("received snapshot for height 3")
|
||||
}
|
||||
logger.Info("added snapshot", "height", msg.Height, "format", msg.Format)
|
||||
|
||||
default:
|
||||
return fmt.Errorf("received unknown message: %T", msg)
|
||||
@@ -681,7 +659,17 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
|
||||
r.Logger.Error("failed to retrieve light block", "err", err, "height", msg.Height)
|
||||
return err
|
||||
}
|
||||
r.Logger.Info("fetched light block", "height", lb.SignedHeader.Header.Height)
|
||||
if lb == nil {
|
||||
r.Logger.Info("returning nil light block", "height", msg.Height)
|
||||
r.blockCh.Out <- p2p.Envelope{
|
||||
To: envelope.From,
|
||||
Message: &ssproto.LightBlockResponse{
|
||||
LightBlock: nil,
|
||||
},
|
||||
}
|
||||
r.Logger.Info("sent light block response", "height", msg.Height)
|
||||
return nil
|
||||
}
|
||||
|
||||
lbproto, err := lb.ToProto()
|
||||
if err != nil {
|
||||
@@ -701,10 +689,8 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
|
||||
|
||||
case *ssproto.LightBlockResponse:
|
||||
r.Logger.Info("received light block response")
|
||||
if r.dispatcher != nil {
|
||||
if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil {
|
||||
r.Logger.Error("error processing light block response", "err", err)
|
||||
}
|
||||
if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil {
|
||||
r.Logger.Error("error processing light block response", "err", err)
|
||||
}
|
||||
|
||||
default:
|
||||
@@ -738,19 +724,16 @@ func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error {
|
||||
defer r.mtx.RUnlock()
|
||||
r.Logger.Debug("received consensus params response", "height", msg.Height)
|
||||
|
||||
if r.syncer == nil {
|
||||
r.Logger.Debug("received unexpected params response; no state sync in progress", "peer", envelope.From)
|
||||
return nil
|
||||
}
|
||||
|
||||
cp := types.ConsensusParamsFromProto(msg.ConsensusParams)
|
||||
|
||||
if sp, ok := r.syncer.stateProvider.(*stateProviderP2P); ok {
|
||||
if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
|
||||
r.Logger.Debug("passing along message")
|
||||
select {
|
||||
case sp.paramsRecvCh <- cp:
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
r.Logger.Debug("received unexpected params response; using RPC state provider", "peer", envelope.From)
|
||||
}
|
||||
|
||||
default:
|
||||
@@ -852,21 +835,33 @@ func (r *Reactor) processCh(ch *p2p.Channel, chName string) {
|
||||
// handle the PeerUpdate or if a panic is recovered.
|
||||
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
|
||||
r.Logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
switch peerUpdate.Status {
|
||||
case p2p.PeerStatusUp:
|
||||
r.peers.Append(peerUpdate.NodeID)
|
||||
if r.syncer != nil {
|
||||
r.syncer.AddPeer(peerUpdate.NodeID)
|
||||
}
|
||||
|
||||
case p2p.PeerStatusDown:
|
||||
r.peers.Remove(peerUpdate.NodeID)
|
||||
if r.syncer != nil {
|
||||
r.syncer.RemovePeer(peerUpdate.NodeID)
|
||||
}
|
||||
}
|
||||
|
||||
r.mtx.Lock()
|
||||
if r.syncer == nil {
|
||||
r.mtx.Unlock()
|
||||
return
|
||||
}
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
switch peerUpdate.Status {
|
||||
case p2p.PeerStatusUp:
|
||||
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
|
||||
r.providers[peerUpdate.NodeID] = newProvider
|
||||
r.syncer.AddPeer(peerUpdate.NodeID)
|
||||
// if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
|
||||
// sp.addProvider(newProvider)
|
||||
// }
|
||||
|
||||
case p2p.PeerStatusDown:
|
||||
delete(r.providers, peerUpdate.NodeID)
|
||||
r.syncer.RemovePeer(peerUpdate.NodeID)
|
||||
}
|
||||
r.Logger.Info("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
}
|
||||
@@ -972,3 +967,35 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error {
|
||||
var err error
|
||||
to := light.TrustOptions{
|
||||
Period: r.cfg.TrustPeriod,
|
||||
Height: r.cfg.TrustHeight,
|
||||
Hash: r.cfg.TrustHashBytes(),
|
||||
}
|
||||
spLogger := r.Logger.With("module", "stateprovider")
|
||||
|
||||
if r.cfg.UseP2P {
|
||||
spLogger.Info("Generating P2P state provider")
|
||||
|
||||
peers := r.peers.All()
|
||||
providers := make([]provider.Provider, len(peers))
|
||||
for idx, p := range peers {
|
||||
providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
|
||||
}
|
||||
|
||||
r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
spLogger.Info("Finished generating P2P state provider")
|
||||
} else {
|
||||
r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/statesync/mocks"
|
||||
"github.com/tendermint/tendermint/internal/test/factory"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
// "github.com/tendermint/tendermint/light"
|
||||
"github.com/tendermint/tendermint/light/provider"
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
@@ -142,6 +142,8 @@ func setup(
|
||||
cfg := config.DefaultStateSyncConfig()
|
||||
|
||||
rts.reactor = NewReactor(
|
||||
factory.DefaultTestChainID,
|
||||
1,
|
||||
*cfg,
|
||||
log.TestingLogger(),
|
||||
conn,
|
||||
@@ -442,19 +444,20 @@ func TestReactor_BlockProviders(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func TestReactor_P2P_Provider(t *testing.T) {
|
||||
func TestReactor_StateProviderP2P(t *testing.T) {
|
||||
rts := setup(t, nil, nil, nil, 2)
|
||||
rts.peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength)),
|
||||
Status: p2p.PeerStatusUp,
|
||||
}
|
||||
rts.peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength)),
|
||||
Status: p2p.PeerStatusUp,
|
||||
}
|
||||
|
||||
// make syncer non nil else test won't think we are state syncing
|
||||
rts.reactor.syncer = rts.syncer
|
||||
peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
|
||||
peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength))
|
||||
rts.peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: peerA,
|
||||
Status: p2p.PeerStatusUp,
|
||||
}
|
||||
rts.peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: peerB,
|
||||
Status: p2p.PeerStatusUp,
|
||||
}
|
||||
|
||||
closeCh := make(chan struct{})
|
||||
defer close(closeCh)
|
||||
@@ -463,42 +466,32 @@ func TestReactor_P2P_Provider(t *testing.T) {
|
||||
go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
|
||||
go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh)
|
||||
|
||||
// we now test the p2p state provider
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
peers := rts.reactor.peers.All()
|
||||
require.Len(t, peers, 2)
|
||||
lb, err := rts.reactor.dispatcher.LightBlock(ctx, 2, peers[0])
|
||||
rts.reactor.cfg.UseP2P = true
|
||||
rts.reactor.cfg.TrustHeight = 1
|
||||
rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
|
||||
ctx := context.Background()
|
||||
err := rts.reactor.initStateProvider(ctx, factory.DefaultTestChainID, 1)
|
||||
require.NoError(t, err)
|
||||
to := light.TrustOptions{
|
||||
Period: 24 * time.Hour,
|
||||
Height: lb.Height,
|
||||
Hash: lb.Hash(),
|
||||
}
|
||||
rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
|
||||
|
||||
providers := make([]provider.Provider, len(peers))
|
||||
for idx, peer := range peers {
|
||||
providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher)
|
||||
}
|
||||
|
||||
p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, providers,
|
||||
to, rts.reactor.paramsCh.Out, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
// set the state provider else the test won't think we are state syncing
|
||||
rts.reactor.syncer = rts.syncer
|
||||
rts.syncer.stateProvider = p2pStateProvider
|
||||
|
||||
appHash, err := p2pStateProvider.AppHash(ctx, 5)
|
||||
appHash, err := rts.reactor.stateProvider.AppHash(ctx, 5)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, appHash, 32)
|
||||
|
||||
state, err := p2pStateProvider.State(ctx, 5)
|
||||
state, err := rts.reactor.stateProvider.State(ctx, 5)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, appHash, state.AppHash)
|
||||
require.Equal(t, types.DefaultConsensusParams(), &state.ConsensusParams)
|
||||
|
||||
commit, err := p2pStateProvider.Commit(ctx, 5)
|
||||
commit, err := rts.reactor.stateProvider.Commit(ctx, 5)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit.BlockID, state.LastBlockID)
|
||||
|
||||
added, err := rts.reactor.syncer.AddSnapshot(peerA, &snapshot{
|
||||
Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
}
|
||||
|
||||
func TestReactor_Backfill(t *testing.T) {
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -43,8 +41,6 @@ func (s *snapshot) Key() snapshotKey {
|
||||
|
||||
// snapshotPool discovers and aggregates snapshots across peers.
|
||||
type snapshotPool struct {
|
||||
stateProvider StateProvider
|
||||
|
||||
tmsync.Mutex
|
||||
snapshots map[snapshotKey]*snapshot
|
||||
snapshotPeers map[snapshotKey]map[types.NodeID]types.NodeID
|
||||
@@ -61,9 +57,8 @@ type snapshotPool struct {
|
||||
}
|
||||
|
||||
// newSnapshotPool creates a new snapshot pool. The state source is used for
|
||||
func newSnapshotPool(stateProvider StateProvider) *snapshotPool {
|
||||
func newSnapshotPool() *snapshotPool {
|
||||
return &snapshotPool{
|
||||
stateProvider: stateProvider,
|
||||
snapshots: make(map[snapshotKey]*snapshot),
|
||||
snapshotPeers: make(map[snapshotKey]map[types.NodeID]types.NodeID),
|
||||
formatIndex: make(map[uint32]map[snapshotKey]bool),
|
||||
@@ -80,14 +75,6 @@ func newSnapshotPool(stateProvider StateProvider) *snapshotPool {
|
||||
// snapshot height is verified using the light client, and the expected app hash
|
||||
// is set for the snapshot.
|
||||
func (p *snapshotPool) Add(peerID types.NodeID, snapshot *snapshot) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
appHash, err := p.stateProvider.AppHash(ctx, snapshot.Height)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get app hash: %w", err)
|
||||
}
|
||||
snapshot.trustedAppHash = appHash
|
||||
key := snapshot.Key()
|
||||
|
||||
p.Lock()
|
||||
|
||||
@@ -3,10 +3,8 @@ package statesync
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/statesync/mocks"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -39,13 +37,10 @@ func TestSnapshot_Key(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Add(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, uint64(1)).Return([]byte("app_hash"), nil)
|
||||
|
||||
peerID := types.NodeID("aa")
|
||||
|
||||
// Adding to the pool should work
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
added, err := pool.Add(peerID, &snapshot{
|
||||
Height: 1,
|
||||
Format: 1,
|
||||
@@ -66,18 +61,12 @@ func TestSnapshotPool_Add(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.False(t, added)
|
||||
|
||||
// The pool should have populated the snapshot with the trusted app hash
|
||||
snapshot := pool.Best()
|
||||
require.NotNil(t, snapshot)
|
||||
require.Equal(t, []byte("app_hash"), snapshot.trustedAppHash)
|
||||
|
||||
stateProvider.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestSnapshotPool_GetPeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}}
|
||||
|
||||
@@ -112,9 +101,7 @@ func TestSnapshotPool_GetPeer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_GetPeers(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}}
|
||||
|
||||
@@ -137,9 +124,7 @@ func TestSnapshotPool_GetPeers(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Ranked_Best(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
// snapshots in expected order (best to worst). Highest height wins, then highest format.
|
||||
// Snapshots with different chunk hashes are considered different, and the most peers is
|
||||
@@ -182,9 +167,7 @@ func TestSnapshotPool_Ranked_Best(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Reject(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerID := types.NodeID("aa")
|
||||
|
||||
@@ -212,9 +195,7 @@ func TestSnapshotPool_Reject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RejectFormat(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerID := types.NodeID("aa")
|
||||
|
||||
@@ -243,9 +224,7 @@ func TestSnapshotPool_RejectFormat(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RejectPeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerAID := types.NodeID("aa")
|
||||
peerBID := types.NodeID("bb")
|
||||
@@ -285,9 +264,7 @@ func TestSnapshotPool_RejectPeer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RemovePeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerAID := types.NodeID("aa")
|
||||
peerBID := types.NodeID("bb")
|
||||
|
||||
@@ -333,12 +333,16 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State,
|
||||
return state, nil
|
||||
}
|
||||
|
||||
// addProvider dynamically adds a peer as a new witness. A limit of 6 providers is kept as a
|
||||
// heuristic. Too many overburdens the network and too little compromises the second layer of security.
|
||||
func (s *stateProviderP2P) addProvider(p lightprovider.Provider) {
|
||||
if len(s.lc.Witnesses()) < 6 {
|
||||
s.lc.AddProvider(p)
|
||||
}
|
||||
}
|
||||
|
||||
// consensusParams sends out a request for consensus params blocking until one is returned.
|
||||
// If it fails to get a valid set of consensus params from any of the providers it returns an error.
|
||||
func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) {
|
||||
providers := s.lc.Witnesses()
|
||||
for _, provider := range providers {
|
||||
|
||||
@@ -84,7 +84,7 @@ func newSyncer(
|
||||
stateProvider: stateProvider,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshots: newSnapshotPool(stateProvider),
|
||||
snapshots: newSnapshotPool(),
|
||||
snapshotCh: snapshotCh,
|
||||
chunkCh: chunkCh,
|
||||
tempDir: tempDir,
|
||||
@@ -118,6 +118,19 @@ func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
|
||||
// AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen
|
||||
// snapshot was accepted and added.
|
||||
func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Fetch the app hash corresponding to the snapshot
|
||||
// TODO: We do all this computation for each and every snapshot we receive (even after
|
||||
// attempting to fetch the chunks and restore state). We should only do this when a snapshot
|
||||
// is selected and the actual sync begins
|
||||
appHash, err := s.stateProvider.AppHash(ctx, snapshot.Height)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get app hash: %w", err)
|
||||
}
|
||||
snapshot.trustedAppHash = appHash
|
||||
|
||||
added, err := s.snapshots.Add(peerID, snapshot)
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
||||
@@ -989,7 +989,7 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
|
||||
c.providerMutex.Lock()
|
||||
defer c.providerMutex.Unlock()
|
||||
|
||||
if len(c.witnesses) <= 1 {
|
||||
if len(c.witnesses) < 1 {
|
||||
return nil, ErrNoWitnesses
|
||||
}
|
||||
|
||||
|
||||
@@ -333,6 +333,8 @@ func makeNode(config *cfg.Config,
|
||||
}
|
||||
|
||||
stateSyncReactor = statesync.NewReactor(
|
||||
genDoc.ChainID,
|
||||
genDoc.InitialHeight,
|
||||
*config.StateSync,
|
||||
stateSyncReactorShim.Logger,
|
||||
proxyApp.Snapshot(),
|
||||
|
||||
@@ -5,8 +5,6 @@ snapshot_interval = 3
|
||||
[node.validator03]
|
||||
snapshot_interval = 3
|
||||
[node.validator04]
|
||||
snapshot_interval =3
|
||||
[node.validator05]
|
||||
state_sync = true
|
||||
start_at = 5
|
||||
persistent_peers = ["validator01", "validator02"]
|
||||
@@ -168,12 +168,21 @@ func NewCLI() *CLI {
|
||||
},
|
||||
})
|
||||
|
||||
cli.root.AddCommand(&cobra.Command{
|
||||
Use: "pause",
|
||||
Short: "Pauses the Docker testnet",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
logger.Info("Pausing testnet")
|
||||
return execCompose(cli.testnet.Dir, "pause")
|
||||
},
|
||||
})
|
||||
|
||||
cli.root.AddCommand(&cobra.Command{
|
||||
Use: "resume",
|
||||
Short: "Resumes the Docker testnet",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
logger.Info("Resuming testnet")
|
||||
return execCompose(cli.testnet.Dir, "up")
|
||||
return execCompose(cli.testnet.Dir, "unpause")
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user