evidence: remove source of non-determinism from test (#7266)

The evidence test produces a set of mock evidence in the evidence pool of the 'Primary' node. The test then fills the evidence pools of secondaries with half of this mock evidence. Finally, the test waits until the secondary has an evidence pool as full as the primary.

The assertions that are removed here were checking that the primary and secondaries' evidence channels were empty. However, nothing in the test actually ensures that the channels are empty. The test only waits for the secondaries to have received the complete set of evidence, and the secondaries already received half of the evidence at the beginning. It's more than possible that the secondaries can receive the complete set of evidence and not finish reading the duplicate evidence off the channels.

(cherry picked from commit 4acd117b5e)

# Conflicts:
#	internal/evidence/reactor_test.go
This commit is contained in:
William Banfield
2021-11-09 15:48:01 -05:00
committed by mergify-bot
parent 12e3419f2b
commit c31c27ef31

View File

@@ -0,0 +1,562 @@
package evidence_test
import (
"encoding/hex"
"math/rand"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/internal/evidence"
"github.com/tendermint/tendermint/internal/evidence/mocks"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
var (
numEvidence = 10
rng = rand.New(rand.NewSource(time.Now().UnixNano()))
)
type reactorTestSuite struct {
network *p2ptest.Network
logger log.Logger
reactors map[types.NodeID]*evidence.Reactor
pools map[types.NodeID]*evidence.Pool
evidenceChannels map[types.NodeID]*p2p.Channel
peerUpdates map[types.NodeID]*p2p.PeerUpdates
peerChans map[types.NodeID]chan p2p.PeerUpdate
nodes []*p2ptest.Node
numStateStores int
}
func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite {
t.Helper()
pID := make([]byte, 16)
_, err := rng.Read(pID)
require.NoError(t, err)
numStateStores := len(stateStores)
rts := &reactorTestSuite{
numStateStores: numStateStores,
logger: log.TestingLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numStateStores}),
reactors: make(map[types.NodeID]*evidence.Reactor, numStateStores),
pools: make(map[types.NodeID]*evidence.Pool, numStateStores),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numStateStores),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numStateStores),
}
chDesc := &p2p.ChannelDescriptor{ID: evidence.EvidenceChannel, MessageType: new(tmproto.EvidenceList)}
rts.evidenceChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)
require.Len(t, rts.network.RandomNode().PeerManager.Peers(), 0)
idx := 0
evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
for nodeID := range rts.network.Nodes {
logger := rts.logger.With("validator", idx)
evidenceDB := dbm.NewMemDB()
blockStore := &mocks.BlockStore{}
state, _ := stateStores[idx].Load()
blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(func(h int64) *types.BlockMeta {
if h <= state.LastBlockHeight {
return &types.BlockMeta{Header: types.Header{Time: evidenceTime}}
}
return nil
})
rts.pools[nodeID], err = evidence.NewPool(logger, evidenceDB, stateStores[idx], blockStore)
require.NoError(t, err)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
rts.reactors[nodeID] = evidence.NewReactor(logger,
rts.evidenceChannels[nodeID],
rts.peerUpdates[nodeID],
rts.pools[nodeID])
require.NoError(t, rts.reactors[nodeID].Start())
require.True(t, rts.reactors[nodeID].IsRunning())
idx++
}
t.Cleanup(func() {
for _, r := range rts.reactors {
if r.IsRunning() {
require.NoError(t, r.Stop())
require.False(t, r.IsRunning())
}
}
leaktest.Check(t)
})
return rts
}
func (rts *reactorTestSuite) start(t *testing.T) {
rts.network.Start(t)
require.Len(t,
rts.network.RandomNode().PeerManager.Peers(),
rts.numStateStores-1,
"network does not have expected number of nodes")
}
func (rts *reactorTestSuite) waitForEvidence(t *testing.T, evList types.EvidenceList, ids ...types.NodeID) {
t.Helper()
fn := func(pool *evidence.Pool) {
var (
localEvList []types.Evidence
size int64
loops int
)
// wait till we have at least the amount of evidence
// that we expect. if there's more local evidence then
// it doesn't make sense to wait longer and a
// different assertion should catch the resulting error
for len(localEvList) < len(evList) {
// each evidence should not be more than 500 bytes
localEvList, size = pool.PendingEvidence(int64(len(evList) * 500))
if loops == 100 {
t.Log("current wait status:", "|",
"local", len(localEvList), "|",
"waitlist", len(evList), "|",
"size", size)
}
loops++
}
// put the reaped evidence in a map so we can quickly check we got everything
evMap := make(map[string]types.Evidence)
for _, e := range localEvList {
evMap[string(e.Hash())] = e
}
for i, expectedEv := range evList {
gotEv := evMap[string(expectedEv.Hash())]
require.Equalf(
t,
expectedEv,
gotEv,
"evidence for pool %d in pool does not match; got: %v, expected: %v", i, gotEv, expectedEv,
)
}
}
if len(ids) == 1 {
// special case waiting once, just to avoid the extra
// goroutine, in the case that this hits a timeout,
// the stack will be clearer.
fn(rts.pools[ids[0]])
return
}
wg := sync.WaitGroup{}
for id := range rts.pools {
if len(ids) > 0 && !p2ptest.NodeInSlice(id, ids) {
// if an ID list is specified, then we only
// want to wait for those pools that are
// specified in the list, otherwise, wait for
// all pools.
continue
}
wg.Add(1)
go func(id types.NodeID) { defer wg.Done(); fn(rts.pools[id]) }(id)
}
wg.Wait()
}
func (rts *reactorTestSuite) assertEvidenceChannelsEmpty(t *testing.T) {
t.Helper()
for id, r := range rts.reactors {
require.NoError(t, r.Stop(), "stopping reactor #%s", id)
r.Wait()
require.False(t, r.IsRunning(), "reactor #%d did not stop", id)
}
for id, ech := range rts.evidenceChannels {
require.Empty(t, ech.Out, "checking channel #%q", id)
}
}
func createEvidenceList(
t *testing.T,
pool *evidence.Pool,
val types.PrivValidator,
numEvidence int,
) types.EvidenceList {
t.Helper()
evList := make([]types.Evidence, numEvidence)
for i := 0; i < numEvidence; i++ {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(
int64(i+1),
time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC),
val,
evidenceChainID,
)
require.NoError(t, pool.AddEvidence(ev),
"adding evidence it#%d of %d to pool with height %d",
i, numEvidence, pool.State().LastBlockHeight)
evList[i] = ev
}
return evList
}
func TestReactorMultiDisconnect(t *testing.T) {
val := types.NewMockPV()
height := int64(numEvidence) + 10
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
rts := setup(t, []sm.Store{stateDB1, stateDB2}, 20)
primary := rts.nodes[0]
secondary := rts.nodes[1]
_ = createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
rts.start(t)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusUp)
// Ensure "disconnecting" the secondary peer from the primary more than once
// is handled gracefully.
primary.PeerManager.Disconnected(secondary.NodeID)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
_, err := primary.PeerManager.TryEvictNext()
require.NoError(t, err)
primary.PeerManager.Disconnected(secondary.NodeID)
require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)
require.Equal(t, secondary.PeerManager.Status(primary.NodeID), p2p.PeerStatusUp)
}
// TestReactorBroadcastEvidence creates an environment of multiple peers that
// are all at the same height. One peer, designated as a primary, gossips all
// evidence to the remaining peers.
func TestReactorBroadcastEvidence(t *testing.T) {
numPeers := 7
// create a stateDB for all test suites (nodes)
stateDBs := make([]sm.Store, numPeers)
val := types.NewMockPV()
// We need all validators saved for heights at least as high as we have
// evidence for.
height := int64(numEvidence) + 10
for i := 0; i < numPeers; i++ {
stateDBs[i] = initializeValidatorState(t, val, height)
}
rts := setup(t, stateDBs, 0)
rts.start(t)
// Create a series of fixtures where each suite contains a reactor and
// evidence pool. In addition, we mark a primary suite and the rest are
// secondaries where each secondary is added as a peer via a PeerUpdate to the
// primary. As a result, the primary will gossip all evidence to each secondary.
primary := rts.network.RandomNode()
secondaries := make([]*p2ptest.Node, 0, len(rts.network.NodeIDs())-1)
secondaryIDs := make([]types.NodeID, 0, cap(secondaries))
for id := range rts.network.Nodes {
if id == primary.NodeID {
continue
}
secondaries = append(secondaries, rts.network.Nodes[id])
secondaryIDs = append(secondaryIDs, id)
}
evList := createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
// Add each secondary suite (node) as a peer to the primary suite (node). This
// will cause the primary to gossip all evidence to the secondaries.
for _, suite := range secondaries {
rts.peerChans[primary.NodeID] <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: suite.NodeID,
}
}
// Wait till all secondary suites (reactor) received all evidence from the
// primary suite (node).
rts.waitForEvidence(t, evList, secondaryIDs...)
for _, pool := range rts.pools {
require.Equal(t, numEvidence, int(pool.Size()))
}
rts.assertEvidenceChannelsEmpty(t)
}
// TestReactorSelectiveBroadcast tests a context where we have two reactors
// connected to one another but are at different heights. Reactor 1 which is
// ahead receives a list of evidence.
func TestReactorBroadcastEvidence_Lagging(t *testing.T) {
val := types.NewMockPV()
height1 := int64(numEvidence) + 10
height2 := int64(numEvidence) / 2
// stateDB1 is ahead of stateDB2, where stateDB1 has all heights (1-20) and
// stateDB2 only has heights 1-5.
stateDB1 := initializeValidatorState(t, val, height1)
stateDB2 := initializeValidatorState(t, val, height2)
rts := setup(t, []sm.Store{stateDB1, stateDB2}, 100)
rts.start(t)
primary := rts.nodes[0]
secondary := rts.nodes[1]
// Send a list of valid evidence to the first reactor's, the one that is ahead,
// evidence pool.
evList := createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
// Add each secondary suite (node) as a peer to the primary suite (node). This
// will cause the primary to gossip all evidence to the secondaries.
rts.peerChans[primary.NodeID] <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: secondary.NodeID,
}
// only ones less than the peers height should make it through
rts.waitForEvidence(t, evList[:height2], secondary.NodeID)
require.Equal(t, numEvidence, int(rts.pools[primary.NodeID].Size()))
require.Equal(t, int(height2), int(rts.pools[secondary.NodeID].Size()))
rts.assertEvidenceChannelsEmpty(t)
}
func TestReactorBroadcastEvidence_Pending(t *testing.T) {
val := types.NewMockPV()
height := int64(10)
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
rts := setup(t, []sm.Store{stateDB1, stateDB2}, 100)
primary := rts.nodes[0]
secondary := rts.nodes[1]
evList := createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
// Manually add half the evidence to the secondary which will mark them as
// pending.
for i := 0; i < numEvidence/2; i++ {
require.NoError(t, rts.pools[secondary.NodeID].AddEvidence(evList[i]))
}
// the secondary should have half the evidence as pending
require.Equal(t, numEvidence/2, int(rts.pools[secondary.NodeID].Size()))
rts.start(t)
// The secondary reactor should have received all the evidence ignoring the
// already pending evidence.
rts.waitForEvidence(t, evList, secondary.NodeID)
// check to make sure that all of the evidence has
// propogated
require.Len(t, rts.pools, 2)
assert.EqualValues(t, numEvidence, rts.pools[primary.NodeID].Size(),
"primary node should have all the evidence")
assert.EqualValues(t, numEvidence, rts.pools[secondary.NodeID].Size(),
"secondary nodes should have caught up")
}
func TestReactorBroadcastEvidence_Committed(t *testing.T) {
val := types.NewMockPV()
height := int64(10)
stateDB1 := initializeValidatorState(t, val, height)
stateDB2 := initializeValidatorState(t, val, height)
rts := setup(t, []sm.Store{stateDB1, stateDB2}, 0)
primary := rts.nodes[0]
secondary := rts.nodes[1]
// add all evidence to the primary reactor
evList := createEvidenceList(t, rts.pools[primary.NodeID], val, numEvidence)
// Manually add half the evidence to the secondary which will mark them as
// pending.
for i := 0; i < numEvidence/2; i++ {
require.NoError(t, rts.pools[secondary.NodeID].AddEvidence(evList[i]))
}
// the secondary should have half the evidence as pending
require.Equal(t, numEvidence/2, int(rts.pools[secondary.NodeID].Size()))
state, err := stateDB2.Load()
require.NoError(t, err)
// update the secondary's pool such that all pending evidence is committed
state.LastBlockHeight++
rts.pools[secondary.NodeID].Update(state, evList[:numEvidence/2])
// the secondary should have half the evidence as committed
require.Equal(t, 0, int(rts.pools[secondary.NodeID].Size()))
// start the network and ensure it's configured
rts.start(t)
// The secondary reactor should have received all the evidence ignoring the
// already committed evidence.
rts.waitForEvidence(t, evList[numEvidence/2:], secondary.NodeID)
require.Len(t, rts.pools, 2)
assert.EqualValues(t, numEvidence, rts.pools[primary.NodeID].Size(),
"primary node should have all the evidence")
assert.EqualValues(t, numEvidence/2, rts.pools[secondary.NodeID].Size(),
"secondary nodes should have caught up")
}
func TestReactorBroadcastEvidence_FullyConnected(t *testing.T) {
numPeers := 7
// create a stateDB for all test suites (nodes)
stateDBs := make([]sm.Store, numPeers)
val := types.NewMockPV()
// We need all validators saved for heights at least as high as we have
// evidence for.
height := int64(numEvidence) + 10
for i := 0; i < numPeers; i++ {
stateDBs[i] = initializeValidatorState(t, val, height)
}
rts := setup(t, stateDBs, 0)
rts.start(t)
evList := createEvidenceList(t, rts.pools[rts.network.RandomNode().NodeID], val, numEvidence)
// every suite (reactor) connects to every other suite (reactor)
for outerID, outerChan := range rts.peerChans {
for innerID := range rts.peerChans {
if outerID != innerID {
outerChan <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
NodeID: innerID,
}
}
}
}
// wait till all suites (reactors) received all evidence from other suites (reactors)
rts.waitForEvidence(t, evList)
for _, pool := range rts.pools {
require.Equal(t, numEvidence, int(pool.Size()))
// commit state so we do not continue to repeat gossiping the same evidence
state := pool.State()
state.LastBlockHeight++
pool.Update(state, evList)
}
}
// nolint:lll
func TestEvidenceListSerialization(t *testing.T) {
exampleVote := func(msgType byte) *types.Vote {
var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z")
require.NoError(t, err)
return &types.Vote{
Type: tmproto.SignedMsgType(msgType),
Height: 3,
Round: 2,
Timestamp: stamp,
BlockID: types.BlockID{
Hash: tmhash.Sum([]byte("blockID_hash")),
PartSetHeader: types.PartSetHeader{
Total: 1000000,
Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")),
},
},
ValidatorAddress: crypto.AddressHash([]byte("validator_address")),
ValidatorIndex: 56789,
}
}
val := &types.Validator{
Address: crypto.AddressHash([]byte("validator_address")),
VotingPower: 10,
}
valSet := types.NewValidatorSet([]*types.Validator{val})
dupl, err := types.NewDuplicateVoteEvidence(
exampleVote(1),
exampleVote(2),
defaultEvidenceTime,
valSet,
)
require.NoError(t, err)
testCases := map[string]struct {
evidenceList []types.Evidence
expBytes string
}{
"DuplicateVoteEvidence": {
[]types.Evidence{dupl},
"0a85020a82020a79080210031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb031279080110031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb03180a200a2a060880dbaae105",
},
}
for name, tc := range testCases {
tc := tc
t.Run(name, func(t *testing.T) {
protoEv := make([]tmproto.Evidence, len(tc.evidenceList))
for i := 0; i < len(tc.evidenceList); i++ {
ev, err := types.EvidenceToProto(tc.evidenceList[i])
require.NoError(t, err)
protoEv[i] = *ev
}
epl := tmproto.EvidenceList{
Evidence: protoEv,
}
bz, err := epl.Marshal()
require.NoError(t, err)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz))
})
}
}