mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 22:05:18 +00:00
This is (#8446) pulled from the `main/libp2p` branch but without any of the libp2p content, and is perhaps the easiest first step to enable pluggability at the peer layer, and makes it possible hoist shims (including for, say 0.34) into tendermint without touching the reactors.
480 lines
13 KiB
Go
480 lines
13 KiB
Go
package blocksync
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/fortytw2/leaktest"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
dbm "github.com/tendermint/tm-db"
|
|
|
|
abciclient "github.com/tendermint/tendermint/abci/client"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/internal/consensus"
|
|
"github.com/tendermint/tendermint/internal/eventbus"
|
|
mpmocks "github.com/tendermint/tendermint/internal/mempool/mocks"
|
|
"github.com/tendermint/tendermint/internal/p2p"
|
|
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
|
|
"github.com/tendermint/tendermint/internal/proxy"
|
|
sm "github.com/tendermint/tendermint/internal/state"
|
|
sf "github.com/tendermint/tendermint/internal/state/test/factory"
|
|
"github.com/tendermint/tendermint/internal/store"
|
|
"github.com/tendermint/tendermint/internal/test/factory"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
type reactorTestSuite struct {
|
|
network *p2ptest.Network
|
|
logger log.Logger
|
|
nodes []types.NodeID
|
|
|
|
reactors map[types.NodeID]*Reactor
|
|
app map[types.NodeID]abciclient.Client
|
|
|
|
blockSyncChannels map[types.NodeID]p2p.Channel
|
|
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
|
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
|
}
|
|
|
|
func setup(
|
|
ctx context.Context,
|
|
t *testing.T,
|
|
genDoc *types.GenesisDoc,
|
|
privVal types.PrivValidator,
|
|
maxBlockHeights []int64,
|
|
) *reactorTestSuite {
|
|
t.Helper()
|
|
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithCancel(ctx)
|
|
|
|
numNodes := len(maxBlockHeights)
|
|
require.True(t, numNodes >= 1,
|
|
"must specify at least one block height (nodes)")
|
|
|
|
rts := &reactorTestSuite{
|
|
logger: log.NewNopLogger().With("module", "block_sync", "testCase", t.Name()),
|
|
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
|
|
nodes: make([]types.NodeID, 0, numNodes),
|
|
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
|
app: make(map[types.NodeID]abciclient.Client, numNodes),
|
|
blockSyncChannels: make(map[types.NodeID]p2p.Channel, numNodes),
|
|
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
|
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
|
}
|
|
|
|
chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel, MessageType: new(bcproto.Message)}
|
|
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc)
|
|
|
|
i := 0
|
|
for nodeID := range rts.network.Nodes {
|
|
rts.addNode(ctx, t, nodeID, genDoc, privVal, maxBlockHeights[i])
|
|
i++
|
|
}
|
|
|
|
t.Cleanup(func() {
|
|
cancel()
|
|
for _, nodeID := range rts.nodes {
|
|
if rts.reactors[nodeID].IsRunning() {
|
|
rts.reactors[nodeID].Wait()
|
|
rts.app[nodeID].Wait()
|
|
|
|
require.False(t, rts.reactors[nodeID].IsRunning())
|
|
}
|
|
}
|
|
})
|
|
t.Cleanup(leaktest.Check(t))
|
|
|
|
return rts
|
|
}
|
|
|
|
func makeReactor(
|
|
ctx context.Context,
|
|
t *testing.T,
|
|
nodeID types.NodeID,
|
|
genDoc *types.GenesisDoc,
|
|
privVal types.PrivValidator,
|
|
channelCreator p2p.ChannelCreator,
|
|
peerEvents p2p.PeerEventSubscriber) *Reactor {
|
|
|
|
logger := log.NewNopLogger()
|
|
|
|
app := proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics())
|
|
require.NoError(t, app.Start(ctx))
|
|
|
|
blockDB := dbm.NewMemDB()
|
|
stateDB := dbm.NewMemDB()
|
|
stateStore := sm.NewStore(stateDB)
|
|
blockStore := store.NewBlockStore(blockDB)
|
|
|
|
state, err := sm.MakeGenesisState(genDoc)
|
|
require.NoError(t, err)
|
|
require.NoError(t, stateStore.Save(state))
|
|
mp := &mpmocks.Mempool{}
|
|
mp.On("Lock").Return()
|
|
mp.On("Unlock").Return()
|
|
mp.On("FlushAppConn", mock.Anything).Return(nil)
|
|
mp.On("Update",
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything,
|
|
mock.Anything).Return(nil)
|
|
|
|
eventbus := eventbus.NewDefault(logger)
|
|
require.NoError(t, eventbus.Start(ctx))
|
|
|
|
blockExec := sm.NewBlockExecutor(
|
|
stateStore,
|
|
log.NewNopLogger(),
|
|
app,
|
|
mp,
|
|
sm.EmptyEvidencePool{},
|
|
blockStore,
|
|
eventbus,
|
|
sm.NopMetrics(),
|
|
)
|
|
|
|
return NewReactor(
|
|
logger,
|
|
stateStore,
|
|
blockExec,
|
|
blockStore,
|
|
nil,
|
|
channelCreator,
|
|
peerEvents,
|
|
true,
|
|
consensus.NopMetrics(),
|
|
nil, // eventbus, can be nil
|
|
)
|
|
}
|
|
|
|
func (rts *reactorTestSuite) addNode(
|
|
ctx context.Context,
|
|
t *testing.T,
|
|
nodeID types.NodeID,
|
|
genDoc *types.GenesisDoc,
|
|
privVal types.PrivValidator,
|
|
maxBlockHeight int64,
|
|
) {
|
|
t.Helper()
|
|
|
|
logger := log.NewNopLogger()
|
|
|
|
rts.nodes = append(rts.nodes, nodeID)
|
|
rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics())
|
|
require.NoError(t, rts.app[nodeID].Start(ctx))
|
|
|
|
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
|
|
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
|
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
|
|
|
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
|
return rts.blockSyncChannels[nodeID], nil
|
|
}
|
|
|
|
peerEvents := func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] }
|
|
reactor := makeReactor(ctx, t, nodeID, genDoc, privVal, chCreator, peerEvents)
|
|
|
|
lastExtCommit := &types.ExtendedCommit{}
|
|
|
|
state, err := reactor.stateStore.Load()
|
|
require.NoError(t, err)
|
|
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
|
block, blockID, partSet, seenExtCommit := makeNextBlock(ctx, t, state, privVal, blockHeight, lastExtCommit)
|
|
|
|
state, err = reactor.blockExec.ApplyBlock(ctx, state, blockID, block)
|
|
require.NoError(t, err)
|
|
|
|
reactor.store.SaveBlockWithExtendedCommit(block, partSet, seenExtCommit)
|
|
lastExtCommit = seenExtCommit
|
|
}
|
|
|
|
rts.reactors[nodeID] = reactor
|
|
require.NoError(t, reactor.Start(ctx))
|
|
require.True(t, reactor.IsRunning())
|
|
}
|
|
|
|
func makeNextBlock(ctx context.Context,
|
|
t *testing.T,
|
|
state sm.State,
|
|
signer types.PrivValidator,
|
|
height int64,
|
|
lc *types.ExtendedCommit) (*types.Block, types.BlockID, *types.PartSet, *types.ExtendedCommit) {
|
|
|
|
lastExtCommit := lc.Clone()
|
|
|
|
block := sf.MakeBlock(state, height, lastExtCommit.ToCommit())
|
|
partSet, err := block.MakePartSet(types.BlockPartSizeBytes)
|
|
require.NoError(t, err)
|
|
blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: partSet.Header()}
|
|
|
|
// Simulate a commit for the current height
|
|
vote, err := factory.MakeVote(
|
|
ctx,
|
|
signer,
|
|
block.Header.ChainID,
|
|
0,
|
|
block.Header.Height,
|
|
0,
|
|
2,
|
|
blockID,
|
|
time.Now(),
|
|
)
|
|
require.NoError(t, err)
|
|
seenExtCommit := &types.ExtendedCommit{
|
|
Height: vote.Height,
|
|
Round: vote.Round,
|
|
BlockID: blockID,
|
|
ExtendedSignatures: []types.ExtendedCommitSig{vote.ExtendedCommitSig()},
|
|
}
|
|
return block, blockID, partSet, seenExtCommit
|
|
|
|
}
|
|
|
|
func (rts *reactorTestSuite) start(ctx context.Context, t *testing.T) {
|
|
t.Helper()
|
|
rts.network.Start(ctx, t)
|
|
require.Len(t,
|
|
rts.network.RandomNode().PeerManager.Peers(),
|
|
len(rts.nodes)-1,
|
|
"network does not have expected number of nodes")
|
|
}
|
|
|
|
func TestReactor_AbruptDisconnect(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_test")
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(cfg.RootDir)
|
|
|
|
valSet, privVals := factory.ValidatorSet(ctx, t, 1, 30)
|
|
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams())
|
|
maxBlockHeight := int64(64)
|
|
|
|
rts := setup(ctx, t, genDoc, privVals[0], []int64{maxBlockHeight, 0})
|
|
|
|
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
|
|
|
rts.start(ctx, t)
|
|
|
|
secondaryPool := rts.reactors[rts.nodes[1]].pool
|
|
|
|
require.Eventually(
|
|
t,
|
|
func() bool {
|
|
height, _, _ := secondaryPool.GetStatus()
|
|
return secondaryPool.MaxPeerHeight() > 0 && height > 0 && height < 10
|
|
},
|
|
10*time.Second,
|
|
10*time.Millisecond,
|
|
"expected node to be partially synced",
|
|
)
|
|
|
|
// Remove synced node from the syncing node which should not result in any
|
|
// deadlocks or race conditions within the context of poolRoutine.
|
|
rts.peerChans[rts.nodes[1]] <- p2p.PeerUpdate{
|
|
Status: p2p.PeerStatusDown,
|
|
NodeID: rts.nodes[0],
|
|
}
|
|
rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(ctx, rts.nodes[0])
|
|
}
|
|
|
|
func TestReactor_SyncTime(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_test")
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(cfg.RootDir)
|
|
|
|
valSet, privVals := factory.ValidatorSet(ctx, t, 1, 30)
|
|
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams())
|
|
maxBlockHeight := int64(101)
|
|
|
|
rts := setup(ctx, t, genDoc, privVals[0], []int64{maxBlockHeight, 0})
|
|
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
|
rts.start(ctx, t)
|
|
|
|
require.Eventually(
|
|
t,
|
|
func() bool {
|
|
return rts.reactors[rts.nodes[1]].GetRemainingSyncTime() > time.Nanosecond &&
|
|
rts.reactors[rts.nodes[1]].pool.getLastSyncRate() > 0.001
|
|
},
|
|
10*time.Second,
|
|
10*time.Millisecond,
|
|
"expected node to be partially synced",
|
|
)
|
|
}
|
|
|
|
func TestReactor_NoBlockResponse(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_test")
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(cfg.RootDir)
|
|
|
|
valSet, privVals := factory.ValidatorSet(ctx, t, 1, 30)
|
|
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams())
|
|
maxBlockHeight := int64(65)
|
|
|
|
rts := setup(ctx, t, genDoc, privVals[0], []int64{maxBlockHeight, 0})
|
|
|
|
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
|
|
|
rts.start(ctx, t)
|
|
|
|
testCases := []struct {
|
|
height int64
|
|
existent bool
|
|
}{
|
|
{maxBlockHeight + 2, false},
|
|
{10, true},
|
|
{1, true},
|
|
{100, false},
|
|
}
|
|
|
|
secondaryPool := rts.reactors[rts.nodes[1]].pool
|
|
require.Eventually(
|
|
t,
|
|
func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() },
|
|
10*time.Second,
|
|
10*time.Millisecond,
|
|
"expected node to be fully synced",
|
|
)
|
|
|
|
for _, tc := range testCases {
|
|
block := rts.reactors[rts.nodes[1]].store.LoadBlock(tc.height)
|
|
if tc.existent {
|
|
require.True(t, block != nil)
|
|
} else {
|
|
require.Nil(t, block)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestReactor_BadBlockStopsPeer(t *testing.T) {
|
|
// Ultimately, this should be refactored to be less integration test oriented
|
|
// and more unit test oriented by simply testing channel sends and receives.
|
|
// See: https://github.com/tendermint/tendermint/issues/6005
|
|
t.SkipNow()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_test")
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(cfg.RootDir)
|
|
|
|
maxBlockHeight := int64(48)
|
|
valSet, privVals := factory.ValidatorSet(ctx, t, 1, 30)
|
|
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams())
|
|
|
|
rts := setup(ctx, t, genDoc, privVals[0], []int64{maxBlockHeight, 0, 0, 0, 0})
|
|
|
|
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
|
|
|
rts.start(ctx, t)
|
|
|
|
require.Eventually(
|
|
t,
|
|
func() bool {
|
|
caughtUp := true
|
|
for _, id := range rts.nodes[1 : len(rts.nodes)-1] {
|
|
if rts.reactors[id].pool.MaxPeerHeight() == 0 || !rts.reactors[id].pool.IsCaughtUp() {
|
|
caughtUp = false
|
|
}
|
|
}
|
|
|
|
return caughtUp
|
|
},
|
|
10*time.Minute,
|
|
10*time.Millisecond,
|
|
"expected all nodes to be fully synced",
|
|
)
|
|
|
|
for _, id := range rts.nodes[:len(rts.nodes)-1] {
|
|
require.Len(t, rts.reactors[id].pool.peers, 3)
|
|
}
|
|
|
|
// Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect
|
|
// from this peer.
|
|
//
|
|
// XXX: This causes a potential race condition.
|
|
// See: https://github.com/tendermint/tendermint/issues/6005
|
|
valSet, otherPrivVals := factory.ValidatorSet(ctx, t, 1, 30)
|
|
otherGenDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams())
|
|
newNode := rts.network.MakeNode(ctx, t, p2ptest.NodeOptions{
|
|
MaxPeers: uint16(len(rts.nodes) + 1),
|
|
MaxConnected: uint16(len(rts.nodes) + 1),
|
|
})
|
|
rts.addNode(ctx, t, newNode.NodeID, otherGenDoc, otherPrivVals[0], maxBlockHeight)
|
|
|
|
// add a fake peer just so we do not wait for the consensus ticker to timeout
|
|
rts.reactors[newNode.NodeID].pool.SetPeerRange("00ff", 10, 10)
|
|
|
|
// wait for the new peer to catch up and become fully synced
|
|
require.Eventually(
|
|
t,
|
|
func() bool {
|
|
return rts.reactors[newNode.NodeID].pool.MaxPeerHeight() > 0 && rts.reactors[newNode.NodeID].pool.IsCaughtUp()
|
|
},
|
|
10*time.Minute,
|
|
10*time.Millisecond,
|
|
"expected new node to be fully synced",
|
|
)
|
|
|
|
require.Eventuallyf(
|
|
t,
|
|
func() bool { return len(rts.reactors[newNode.NodeID].pool.peers) < len(rts.nodes)-1 },
|
|
10*time.Minute,
|
|
10*time.Millisecond,
|
|
"invalid number of peers; expected < %d, got: %d",
|
|
len(rts.nodes)-1,
|
|
len(rts.reactors[newNode.NodeID].pool.peers),
|
|
)
|
|
}
|
|
|
|
/*
|
|
func TestReactorReceivesNoExtendedCommit(t *testing.T) {
|
|
blockDB := dbm.NewMemDB()
|
|
stateDB := dbm.NewMemDB()
|
|
stateStore := sm.NewStore(stateDB)
|
|
blockStore := store.NewBlockStore(blockDB)
|
|
blockExec := sm.NewBlockExecutor(
|
|
stateStore,
|
|
log.NewNopLogger(),
|
|
rts.app[nodeID],
|
|
mp,
|
|
sm.EmptyEvidencePool{},
|
|
blockStore,
|
|
eventbus,
|
|
sm.NopMetrics(),
|
|
)
|
|
NewReactor(
|
|
log.NewNopLogger(),
|
|
stateStore,
|
|
blockExec,
|
|
blockStore,
|
|
nil,
|
|
chCreator,
|
|
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
|
|
rts.blockSync,
|
|
consensus.NopMetrics(),
|
|
nil, // eventbus, can be nil
|
|
)
|
|
|
|
}
|
|
*/
|