mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-04 11:02:06 +00:00
* internal/proxy: add initial set of abci metrics (#7115) This PR adds an initial set of metrics for use ABCI. The initial metrics enable the calculation of timing histograms and call counts for each of the ABCI methods. The metrics are also labeled as either 'sync' or 'async' to determine if the method call was performed using ABCI's `*Async` methods. An example of these metrics is included here for reference: ``` tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.0001"} 0 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.0004"} 5 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.002"} 12 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.009"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.02"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.1"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.65"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="2"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="6"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="25"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="+Inf"} 13 tendermint_abci_connection_method_timing_sum{chain_id="ci",method="commit",type="sync"} 0.007802058000000001 tendermint_abci_connection_method_timing_count{chain_id="ci",method="commit",type="sync"} 13 ``` These metrics can easily be graphed using prometheus's `histogram_quantile(...)` method to pick out a particular quantile to graph or examine. I chose buckets that were somewhat of an estimate of expected range of times for ABCI operations. They start at .0001 seconds and range to 25 seconds. The hope is that this range captures enough possible times to be useful for us and operators. * lint++ * docs: add abci timing metrics to the metrics docs (#7311) * cherry-pick fixup
364 lines
10 KiB
Go
364 lines
10 KiB
Go
package v0
|
|
|
|
import (
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
dbm "github.com/tendermint/tm-db"
|
|
|
|
abciclient "github.com/tendermint/tendermint/abci/client"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/internal/consensus"
|
|
"github.com/tendermint/tendermint/internal/mempool/mock"
|
|
"github.com/tendermint/tendermint/internal/p2p"
|
|
"github.com/tendermint/tendermint/internal/p2p/p2ptest"
|
|
"github.com/tendermint/tendermint/internal/proxy"
|
|
sm "github.com/tendermint/tendermint/internal/state"
|
|
sf "github.com/tendermint/tendermint/internal/state/test/factory"
|
|
"github.com/tendermint/tendermint/internal/store"
|
|
"github.com/tendermint/tendermint/internal/test/factory"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
type reactorTestSuite struct {
|
|
network *p2ptest.Network
|
|
logger log.Logger
|
|
nodes []types.NodeID
|
|
|
|
reactors map[types.NodeID]*Reactor
|
|
app map[types.NodeID]proxy.AppConns
|
|
|
|
blockSyncChannels map[types.NodeID]*p2p.Channel
|
|
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
|
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
|
|
|
blockSync bool
|
|
}
|
|
|
|
func setup(
|
|
t *testing.T,
|
|
genDoc *types.GenesisDoc,
|
|
privVal types.PrivValidator,
|
|
maxBlockHeights []int64,
|
|
chBuf uint,
|
|
) *reactorTestSuite {
|
|
t.Helper()
|
|
|
|
numNodes := len(maxBlockHeights)
|
|
require.True(t, numNodes >= 1,
|
|
"must specify at least one block height (nodes)")
|
|
|
|
rts := &reactorTestSuite{
|
|
logger: log.TestingLogger().With("module", "block_sync", "testCase", t.Name()),
|
|
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
|
|
nodes: make([]types.NodeID, 0, numNodes),
|
|
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
|
app: make(map[types.NodeID]proxy.AppConns, numNodes),
|
|
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
|
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
|
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
|
blockSync: true,
|
|
}
|
|
|
|
chDesc := p2p.ChannelDescriptor{ID: byte(BlockSyncChannel)}
|
|
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(bcproto.Message), int(chBuf))
|
|
|
|
i := 0
|
|
for nodeID := range rts.network.Nodes {
|
|
rts.addNode(t, nodeID, genDoc, privVal, maxBlockHeights[i])
|
|
i++
|
|
}
|
|
|
|
t.Cleanup(func() {
|
|
for _, nodeID := range rts.nodes {
|
|
rts.peerUpdates[nodeID].Close()
|
|
|
|
if rts.reactors[nodeID].IsRunning() {
|
|
require.NoError(t, rts.reactors[nodeID].Stop())
|
|
require.NoError(t, rts.app[nodeID].Stop())
|
|
require.False(t, rts.reactors[nodeID].IsRunning())
|
|
}
|
|
}
|
|
})
|
|
|
|
return rts
|
|
}
|
|
|
|
func (rts *reactorTestSuite) addNode(t *testing.T,
|
|
nodeID types.NodeID,
|
|
genDoc *types.GenesisDoc,
|
|
privVal types.PrivValidator,
|
|
maxBlockHeight int64,
|
|
) {
|
|
t.Helper()
|
|
|
|
rts.nodes = append(rts.nodes, nodeID)
|
|
rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), proxy.NopMetrics())
|
|
require.NoError(t, rts.app[nodeID].Start())
|
|
|
|
blockDB := dbm.NewMemDB()
|
|
stateDB := dbm.NewMemDB()
|
|
stateStore := sm.NewStore(stateDB)
|
|
blockStore := store.NewBlockStore(blockDB)
|
|
|
|
state, err := sm.MakeGenesisState(genDoc)
|
|
require.NoError(t, err)
|
|
require.NoError(t, stateStore.Save(state))
|
|
|
|
blockExec := sm.NewBlockExecutor(
|
|
stateStore,
|
|
log.TestingLogger(),
|
|
rts.app[nodeID].Consensus(),
|
|
mock.Mempool{},
|
|
sm.EmptyEvidencePool{},
|
|
blockStore,
|
|
)
|
|
|
|
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
|
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
|
|
|
|
if blockHeight > 1 {
|
|
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
|
|
lastBlock := blockStore.LoadBlock(blockHeight - 1)
|
|
|
|
vote, err := factory.MakeVote(
|
|
privVal,
|
|
lastBlock.Header.ChainID, 0,
|
|
lastBlock.Header.Height, 0, 2,
|
|
lastBlockMeta.BlockID,
|
|
time.Now(),
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
lastCommit = types.NewCommit(
|
|
vote.Height,
|
|
vote.Round,
|
|
lastBlockMeta.BlockID,
|
|
[]types.CommitSig{vote.CommitSig()},
|
|
)
|
|
}
|
|
|
|
thisBlock := sf.MakeBlock(state, blockHeight, lastCommit)
|
|
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
|
|
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
|
|
|
|
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
|
|
require.NoError(t, err)
|
|
|
|
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
|
|
}
|
|
|
|
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
|
|
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
|
rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID])
|
|
rts.reactors[nodeID], err = NewReactor(
|
|
rts.logger.With("nodeID", nodeID),
|
|
state.Copy(),
|
|
blockExec,
|
|
blockStore,
|
|
nil,
|
|
rts.blockSyncChannels[nodeID],
|
|
rts.peerUpdates[nodeID],
|
|
rts.blockSync,
|
|
consensus.NopMetrics())
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, rts.reactors[nodeID].Start())
|
|
require.True(t, rts.reactors[nodeID].IsRunning())
|
|
}
|
|
|
|
func (rts *reactorTestSuite) start(t *testing.T) {
|
|
t.Helper()
|
|
rts.network.Start(t)
|
|
require.Len(t,
|
|
rts.network.RandomNode().PeerManager.Peers(),
|
|
len(rts.nodes)-1,
|
|
"network does not have expected number of nodes")
|
|
}
|
|
|
|
func TestReactor_AbruptDisconnect(t *testing.T) {
|
|
cfg, err := config.ResetTestRoot("block_sync_reactor_test")
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(cfg.RootDir)
|
|
|
|
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30)
|
|
maxBlockHeight := int64(64)
|
|
|
|
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0)
|
|
|
|
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
|
|
|
rts.start(t)
|
|
|
|
secondaryPool := rts.reactors[rts.nodes[1]].pool
|
|
|
|
require.Eventually(
|
|
t,
|
|
func() bool {
|
|
height, _, _ := secondaryPool.GetStatus()
|
|
return secondaryPool.MaxPeerHeight() > 0 && height > 0 && height < 10
|
|
},
|
|
10*time.Second,
|
|
10*time.Millisecond,
|
|
"expected node to be partially synced",
|
|
)
|
|
|
|
// Remove synced node from the syncing node which should not result in any
|
|
// deadlocks or race conditions within the context of poolRoutine.
|
|
rts.peerChans[rts.nodes[1]] <- p2p.PeerUpdate{
|
|
Status: p2p.PeerStatusDown,
|
|
NodeID: rts.nodes[0],
|
|
}
|
|
rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0])
|
|
}
|
|
|
|
func TestReactor_SyncTime(t *testing.T) {
|
|
cfg, err := config.ResetTestRoot("block_sync_reactor_test")
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(cfg.RootDir)
|
|
|
|
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30)
|
|
maxBlockHeight := int64(101)
|
|
|
|
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0)
|
|
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
|
rts.start(t)
|
|
|
|
require.Eventually(
|
|
t,
|
|
func() bool {
|
|
return rts.reactors[rts.nodes[1]].GetRemainingSyncTime() > time.Nanosecond &&
|
|
rts.reactors[rts.nodes[1]].pool.getLastSyncRate() > 0.001
|
|
},
|
|
10*time.Second,
|
|
10*time.Millisecond,
|
|
"expected node to be partially synced",
|
|
)
|
|
}
|
|
|
|
func TestReactor_NoBlockResponse(t *testing.T) {
|
|
cfg, err := config.ResetTestRoot("block_sync_reactor_test")
|
|
require.NoError(t, err)
|
|
|
|
defer os.RemoveAll(cfg.RootDir)
|
|
|
|
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30)
|
|
maxBlockHeight := int64(65)
|
|
|
|
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0)
|
|
|
|
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
|
|
|
rts.start(t)
|
|
|
|
testCases := []struct {
|
|
height int64
|
|
existent bool
|
|
}{
|
|
{maxBlockHeight + 2, false},
|
|
{10, true},
|
|
{1, true},
|
|
{100, false},
|
|
}
|
|
|
|
secondaryPool := rts.reactors[rts.nodes[1]].pool
|
|
require.Eventually(
|
|
t,
|
|
func() bool { return secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() },
|
|
10*time.Second,
|
|
10*time.Millisecond,
|
|
"expected node to be fully synced",
|
|
)
|
|
|
|
for _, tc := range testCases {
|
|
block := rts.reactors[rts.nodes[1]].store.LoadBlock(tc.height)
|
|
if tc.existent {
|
|
require.True(t, block != nil)
|
|
} else {
|
|
require.Nil(t, block)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestReactor_BadBlockStopsPeer(t *testing.T) {
|
|
// Ultimately, this should be refactored to be less integration test oriented
|
|
// and more unit test oriented by simply testing channel sends and receives.
|
|
// See: https://github.com/tendermint/tendermint/issues/6005
|
|
t.SkipNow()
|
|
|
|
cfg, err := config.ResetTestRoot("block_sync_reactor_test")
|
|
require.NoError(t, err)
|
|
defer os.RemoveAll(cfg.RootDir)
|
|
|
|
maxBlockHeight := int64(48)
|
|
genDoc, privVals := factory.RandGenesisDoc(cfg, 1, false, 30)
|
|
|
|
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0, 0, 0, 0}, 1000)
|
|
|
|
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
|
|
|
rts.start(t)
|
|
|
|
require.Eventually(
|
|
t,
|
|
func() bool {
|
|
caughtUp := true
|
|
for _, id := range rts.nodes[1 : len(rts.nodes)-1] {
|
|
if rts.reactors[id].pool.MaxPeerHeight() == 0 || !rts.reactors[id].pool.IsCaughtUp() {
|
|
caughtUp = false
|
|
}
|
|
}
|
|
|
|
return caughtUp
|
|
},
|
|
10*time.Minute,
|
|
10*time.Millisecond,
|
|
"expected all nodes to be fully synced",
|
|
)
|
|
|
|
for _, id := range rts.nodes[:len(rts.nodes)-1] {
|
|
require.Len(t, rts.reactors[id].pool.peers, 3)
|
|
}
|
|
|
|
// Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect
|
|
// from this peer.
|
|
//
|
|
// XXX: This causes a potential race condition.
|
|
// See: https://github.com/tendermint/tendermint/issues/6005
|
|
otherGenDoc, otherPrivVals := factory.RandGenesisDoc(cfg, 1, false, 30)
|
|
newNode := rts.network.MakeNode(t, p2ptest.NodeOptions{
|
|
MaxPeers: uint16(len(rts.nodes) + 1),
|
|
MaxConnected: uint16(len(rts.nodes) + 1),
|
|
})
|
|
rts.addNode(t, newNode.NodeID, otherGenDoc, otherPrivVals[0], maxBlockHeight)
|
|
|
|
// add a fake peer just so we do not wait for the consensus ticker to timeout
|
|
rts.reactors[newNode.NodeID].pool.SetPeerRange("00ff", 10, 10)
|
|
|
|
// wait for the new peer to catch up and become fully synced
|
|
require.Eventually(
|
|
t,
|
|
func() bool {
|
|
return rts.reactors[newNode.NodeID].pool.MaxPeerHeight() > 0 && rts.reactors[newNode.NodeID].pool.IsCaughtUp()
|
|
},
|
|
10*time.Minute,
|
|
10*time.Millisecond,
|
|
"expected new node to be fully synced",
|
|
)
|
|
|
|
require.Eventuallyf(
|
|
t,
|
|
func() bool { return len(rts.reactors[newNode.NodeID].pool.peers) < len(rts.nodes)-1 },
|
|
10*time.Minute,
|
|
10*time.Millisecond,
|
|
"invalid number of peers; expected < %d, got: %d",
|
|
len(rts.nodes)-1,
|
|
len(rts.reactors[newNode.NodeID].pool.peers),
|
|
)
|
|
}
|