Files
tendermint/internal/blocksync/v0/reactor_test.go
William Banfield 7bf7ff6639 internal/proxy: add initial set of abci metrics backport (#7342)
* internal/proxy: add initial set of abci metrics (#7115)

This PR adds an initial set of metrics for use ABCI. The initial metrics enable the calculation of timing histograms and call counts for each of the ABCI methods. The metrics are also labeled as either 'sync' or 'async' to determine if the method call was performed using ABCI's `*Async` methods.

An example of these metrics is included here for reference:
```
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.0001"} 0
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.0004"} 5
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.002"} 12
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.009"} 13
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.02"} 13
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.1"} 13
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.65"} 13
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="2"} 13
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="6"} 13
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="25"} 13
tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="+Inf"} 13
tendermint_abci_connection_method_timing_sum{chain_id="ci",method="commit",type="sync"} 0.007802058000000001
tendermint_abci_connection_method_timing_count{chain_id="ci",method="commit",type="sync"} 13
```

These metrics can easily be graphed using prometheus's `histogram_quantile(...)` method to pick out a particular quantile to graph or examine. I chose buckets that were somewhat of an estimate of expected range of times for ABCI operations. They start at .0001 seconds and range to 25 seconds. The hope is that this range captures enough possible times to be useful for us and operators.

* lint++

* docs: add abci timing metrics to the metrics docs (#7311)

* cherry-pick fixup
2021-11-30 10:48:07 -05:00

364 lines
10 KiB
Go

package v0
import (
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/mempool/mock"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
sf "github.com/tendermint/tendermint/internal/state/test/factory"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
"github.com/tendermint/tendermint/types"
)
type reactorTestSuite struct {
network *p2ptest.Network
logger log.Logger
nodes []types.NodeID
reactors map[types.NodeID]*Reactor
app map[types.NodeID]proxy.AppConns
blockSyncChannels map[types.NodeID]*p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate
peerUpdates map[types.NodeID]*p2p.PeerUpdates
blockSync bool
}
func setup(
t *testing.T,
genDoc *types.GenesisDoc,
privVal types.PrivValidator,
maxBlockHeights []int64,
chBuf uint,
) *reactorTestSuite {
t.Helper()
numNodes := len(maxBlockHeights)
require.True(t, numNodes >= 1,
"must specify at least one block height (nodes)")
rts := &reactorTestSuite{
logger: log.TestingLogger().With("module", "block_sync", "testCase", t.Name()),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]proxy.AppConns, numNodes),
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
blockSync: true,
}
chDesc := p2p.ChannelDescriptor{ID: byte(BlockSyncChannel)}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
i := 0
for nodeID := range rts.network.Nodes {
rts.addNode(t, nodeID, genDoc, privVal, maxBlockHeights[i])
i++
}
t.Cleanup(func() {
for _, nodeID := range rts.nodes {
rts.peerUpdates[nodeID].Close()
if rts.reactors[nodeID].IsRunning() {
require.NoError(t, rts.reactors[nodeID].Stop())
require.NoError(t, rts.app[nodeID].Stop())
require.False(t, rts.reactors[nodeID].IsRunning())
}
}
})
return rts
}
func (rts *reactorTestSuite) addNode(t *testing.T,
nodeID types.NodeID,
genDoc *types.GenesisDoc,
privVal types.PrivValidator,
maxBlockHeight int64,
) {
t.Helper()
rts.nodes = append(rts.nodes, nodeID)
rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), proxy.NopMetrics())
require.NoError(t, rts.app[nodeID].Start())
blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(blockDB)
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
require.NoError(t, stateStore.Save(state))
blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
rts.app[nodeID].Consensus(),
mock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,
)
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
vote, err := factory.MakeVote(
privVal,
lastBlock.Header.ChainID, 0,
lastBlock.Header.Height, 0, 2,
lastBlockMeta.BlockID,
time.Now(),
)
require.NoError(t, err)
lastCommit = types.NewCommit(
vote.Height,
vote.Round,
lastBlockMeta.BlockID,
[]types.CommitSig{vote.CommitSig()},
)
}
thisBlock := sf.MakeBlock(state, blockHeight, lastCommit)
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
require.NoError(t, err)
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
rts.reactors[nodeID], err = NewReactor(
rts.logger.With("nodeID", nodeID),
state.Copy(),
blockExec,
blockStore,
nil,
rts.blockSyncChannels[nodeID],
rts.peerUpdates[nodeID],
rts.blockSync,
consensus.NopMetrics())
require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start())
require.True(t, rts.reactors[nodeID].IsRunning())
}
func (rts *reactorTestSuite) start(t *testing.T) {
t.Helper()
rts.network.Start(t)
require.Len(t,
rts.network.RandomNode().PeerManager.Peers(),
len(rts.nodes)-1,
"network does not have expected number of nodes")
}
func TestReactor_AbruptDisconnect(t *testing.T) {
cfg, err := config.ResetTestRoot("block_sync_reactor_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30)
maxBlockHeight := int64(64)
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0)
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
rts.start(t)
secondaryPool := rts.reactors[rts.nodes[1]].pool
require.Eventually(
t,
func() bool {
height, _, _ := secondaryPool.GetStatus()
return secondaryPool.MaxPeerHeight() > 0 && height > 0 && height < 10
},
10*time.Second,
10*time.Millisecond,
"expected node to be partially synced",
)
// Remove synced node from the syncing node which should not result in any
// deadlocks or race conditions within the context of poolRoutine.
rts.peerChans[rts.nodes[1]] <- p2p.PeerUpdate{
Status: p2p.PeerStatusDown,
NodeID: rts.nodes[0],
}
rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0])
}
func TestReactor_SyncTime(t *testing.T) {
cfg, err := config.ResetTestRoot("block_sync_reactor_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30)
maxBlockHeight := int64(101)
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0)
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
rts.start(t)
require.Eventually(
t,
func() bool {
return rts.reactors[rts.nodes[1]].GetRemainingSyncTime() > time.Nanosecond &&
rts.reactors[rts.nodes[1]].pool.getLastSyncRate() > 0.001
},
10*time.Second,
10*time.Millisecond,
"expected node to be partially synced",
)
}
func TestReactor_NoBlockResponse(t *testing.T) {
cfg, err := config.ResetTestRoot("block_sync_reactor_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30)
maxBlockHeight := int64(65)
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0)
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
rts.start(t)
testCases := []struct {
height int64
existent bool
}{
{maxBlockHeight + 2, false},
{10, true},
{1, true},
{100, false},
}
secondaryPool := rts.reactors[rts.nodes[1]].pool
require.Eventually(
t,
func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() },
10*time.Second,
10*time.Millisecond,
"expected node to be fully synced",
)
for _, tc := range testCases {
block := rts.reactors[rts.nodes[1]].store.LoadBlock(tc.height)
if tc.existent {
require.True(t, block != nil)
} else {
require.Nil(t, block)
}
}
}
func TestReactor_BadBlockStopsPeer(t *testing.T) {
// Ultimately, this should be refactored to be less integration test oriented
// and more unit test oriented by simply testing channel sends and receives.
// See: https://github.com/tendermint/tendermint/issues/6005
t.SkipNow()
cfg, err := config.ResetTestRoot("block_sync_reactor_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
maxBlockHeight := int64(48)
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30)
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0, 0, 0, 0}, 1000)
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
rts.start(t)
require.Eventually(
t,
func() bool {
caughtUp := true
for _, id := range rts.nodes[1 : len(rts.nodes)-1] {
if rts.reactors[id].pool.MaxPeerHeight() == 0 || !rts.reactors[id].pool.IsCaughtUp() {
caughtUp = false
}
}
return caughtUp
},
10*time.Minute,
10*time.Millisecond,
"expected all nodes to be fully synced",
)
for _, id := range rts.nodes[:len(rts.nodes)-1] {
require.Len(t, rts.reactors[id].pool.peers, 3)
}
// Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect
// from this peer.
//
// XXX: This causes a potential race condition.
// See: https://github.com/tendermint/tendermint/issues/6005
otherGenDoc, otherPrivVals := factory.RandGenesisDoc(cfg, 1, false, 30)
newNode := rts.network.MakeNode(t, p2ptest.NodeOptions{
MaxPeers: uint16(len(rts.nodes) + 1),
MaxConnected: uint16(len(rts.nodes) + 1),
})
rts.addNode(t, newNode.NodeID, otherGenDoc, otherPrivVals[0], maxBlockHeight)
// add a fake peer just so we do not wait for the consensus ticker to timeout
rts.reactors[newNode.NodeID].pool.SetPeerRange("00ff", 10, 10)
// wait for the new peer to catch up and become fully synced
require.Eventually(
t,
func() bool {
return rts.reactors[newNode.NodeID].pool.MaxPeerHeight() > 0 && rts.reactors[newNode.NodeID].pool.IsCaughtUp()
},
10*time.Minute,
10*time.Millisecond,
"expected new node to be fully synced",
)
require.Eventuallyf(
t,
func() bool { return len(rts.reactors[newNode.NodeID].pool.peers) < len(rts.nodes)-1 },
10*time.Minute,
10*time.Millisecond,
"invalid number of peers; expected < %d, got: %d",
len(rts.nodes)-1,
len(rts.reactors[newNode.NodeID].pool.peers),
)
}