mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 05:46:32 +00:00
* internal/proxy: add initial set of abci metrics (#7115) This PR adds an initial set of metrics for use ABCI. The initial metrics enable the calculation of timing histograms and call counts for each of the ABCI methods. The metrics are also labeled as either 'sync' or 'async' to determine if the method call was performed using ABCI's `*Async` methods. An example of these metrics is included here for reference: ``` tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.0001"} 0 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.0004"} 5 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.002"} 12 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.009"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.02"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.1"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="0.65"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="2"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="6"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="25"} 13 tendermint_abci_connection_method_timing_bucket{chain_id="ci",method="commit",type="sync",le="+Inf"} 13 tendermint_abci_connection_method_timing_sum{chain_id="ci",method="commit",type="sync"} 0.007802058000000001 tendermint_abci_connection_method_timing_count{chain_id="ci",method="commit",type="sync"} 13 ``` These metrics can easily be graphed using prometheus's `histogram_quantile(...)` method to pick out a particular quantile to graph or examine. I chose buckets that were somewhat of an estimate of expected range of times for ABCI operations. They start at .0001 seconds and range to 25 seconds. The hope is that this range captures enough possible times to be useful for us and operators. * fixup * fixups * docs: add abci timing metrics to the metrics docs (#7311) * format table
329 lines
8.8 KiB
Go
329 lines
8.8 KiB
Go
package blockchain
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
dbm "github.com/tendermint/tm-db"
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
cfg "github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/mempool/mock"
|
|
"github.com/tendermint/tendermint/p2p"
|
|
"github.com/tendermint/tendermint/proxy"
|
|
sm "github.com/tendermint/tendermint/state"
|
|
"github.com/tendermint/tendermint/store"
|
|
"github.com/tendermint/tendermint/types"
|
|
tmtime "github.com/tendermint/tendermint/types/time"
|
|
)
|
|
|
|
var config *cfg.Config
|
|
|
|
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
|
|
validators := make([]types.GenesisValidator, numValidators)
|
|
privValidators := make([]types.PrivValidator, numValidators)
|
|
for i := 0; i < numValidators; i++ {
|
|
val, privVal := types.RandValidator(randPower, minPower)
|
|
validators[i] = types.GenesisValidator{
|
|
PubKey: val.PubKey,
|
|
Power: val.VotingPower,
|
|
}
|
|
privValidators[i] = privVal
|
|
}
|
|
sort.Sort(types.PrivValidatorsByAddress(privValidators))
|
|
|
|
return &types.GenesisDoc{
|
|
GenesisTime: tmtime.Now(),
|
|
ChainID: config.ChainID(),
|
|
Validators: validators,
|
|
}, privValidators
|
|
}
|
|
|
|
type ReactorPair struct {
|
|
reactor *Reactor
|
|
app proxy.AppConns
|
|
}
|
|
|
|
func newReactor(
|
|
logger log.Logger,
|
|
genDoc *types.GenesisDoc,
|
|
privVals []types.PrivValidator,
|
|
maxBlockHeight int64) ReactorPair {
|
|
if len(privVals) != 1 {
|
|
panic("only support one validator")
|
|
}
|
|
|
|
app := &testApp{}
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
proxyApp := proxy.NewAppConns(cc, proxy.NopMetrics())
|
|
err := proxyApp.Start()
|
|
if err != nil {
|
|
panic(fmt.Errorf("error start app: %w", err))
|
|
}
|
|
|
|
blockDB := dbm.NewMemDB()
|
|
stateDB := dbm.NewMemDB()
|
|
stateStore := sm.NewStore(stateDB)
|
|
blockStore := store.NewBlockStore(blockDB)
|
|
|
|
state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
|
|
if err != nil {
|
|
panic(fmt.Errorf("error constructing state from genesis file: %w", err))
|
|
}
|
|
|
|
// Make the Reactor itself.
|
|
// NOTE we have to create and commit the blocks first because
|
|
// pool.height is determined from the store.
|
|
fastSync := true
|
|
db := dbm.NewMemDB()
|
|
stateStore = sm.NewStore(db)
|
|
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
|
|
mock.Mempool{}, sm.EmptyEvidencePool{})
|
|
if err = stateStore.Save(state); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// let's add some blocks in
|
|
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
|
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
|
|
if blockHeight > 1 {
|
|
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
|
|
lastBlock := blockStore.LoadBlock(blockHeight - 1)
|
|
|
|
vote, err := types.MakeVote(
|
|
lastBlock.Header.Height,
|
|
lastBlockMeta.BlockID,
|
|
state.Validators,
|
|
privVals[0],
|
|
lastBlock.Header.ChainID,
|
|
time.Now(),
|
|
)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
lastCommit = types.NewCommit(vote.Height, vote.Round,
|
|
lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()})
|
|
}
|
|
|
|
thisBlock := makeBlock(blockHeight, state, lastCommit)
|
|
|
|
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
|
|
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
|
|
|
|
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
|
|
if err != nil {
|
|
panic(fmt.Errorf("error apply block: %w", err))
|
|
}
|
|
|
|
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
|
|
}
|
|
|
|
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
|
|
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
|
|
|
return ReactorPair{bcReactor, proxyApp}
|
|
}
|
|
|
|
func TestNoBlockResponse(t *testing.T) {
|
|
config = cfg.ResetTestRoot("blockchain_reactor_test")
|
|
defer os.RemoveAll(config.RootDir)
|
|
genDoc, privVals := randGenesisDoc(1, false, 30)
|
|
|
|
maxBlockHeight := int64(65)
|
|
|
|
reactorPairs := make([]ReactorPair, 2)
|
|
|
|
reactorPairs[0] = newReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
|
reactorPairs[1] = newReactor(log.TestingLogger(), genDoc, privVals, 0)
|
|
|
|
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
|
|
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
|
return s
|
|
|
|
}, p2p.Connect2Switches)
|
|
|
|
defer func() {
|
|
for _, r := range reactorPairs {
|
|
err := r.reactor.Stop()
|
|
require.NoError(t, err)
|
|
err = r.app.Stop()
|
|
require.NoError(t, err)
|
|
}
|
|
}()
|
|
|
|
tests := []struct {
|
|
height int64
|
|
existent bool
|
|
}{
|
|
{maxBlockHeight + 2, false},
|
|
{10, true},
|
|
{1, true},
|
|
{100, false},
|
|
}
|
|
|
|
for {
|
|
if reactorPairs[1].reactor.pool.IsCaughtUp() {
|
|
break
|
|
}
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
|
|
|
|
for _, tt := range tests {
|
|
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
|
|
if tt.existent {
|
|
assert.True(t, block != nil)
|
|
} else {
|
|
assert.True(t, block == nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NOTE: This is too hard to test without
|
|
// an easy way to add test peer to switch
|
|
// or without significant refactoring of the module.
|
|
// Alternatively we could actually dial a TCP conn but
|
|
// that seems extreme.
|
|
func TestBadBlockStopsPeer(t *testing.T) {
|
|
config = cfg.ResetTestRoot("blockchain_reactor_test")
|
|
defer os.RemoveAll(config.RootDir)
|
|
genDoc, privVals := randGenesisDoc(1, false, 30)
|
|
|
|
maxBlockHeight := int64(148)
|
|
|
|
// Other chain needs a different validator set
|
|
otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30)
|
|
otherChain := newReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight)
|
|
|
|
defer func() {
|
|
err := otherChain.reactor.Stop()
|
|
require.Error(t, err)
|
|
err = otherChain.app.Stop()
|
|
require.NoError(t, err)
|
|
}()
|
|
|
|
reactorPairs := make([]ReactorPair, 4)
|
|
|
|
reactorPairs[0] = newReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
|
reactorPairs[1] = newReactor(log.TestingLogger(), genDoc, privVals, 0)
|
|
reactorPairs[2] = newReactor(log.TestingLogger(), genDoc, privVals, 0)
|
|
reactorPairs[3] = newReactor(log.TestingLogger(), genDoc, privVals, 0)
|
|
|
|
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
|
|
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
|
return s
|
|
|
|
}, p2p.Connect2Switches)
|
|
|
|
defer func() {
|
|
for _, r := range reactorPairs {
|
|
err := r.reactor.Stop()
|
|
require.NoError(t, err)
|
|
|
|
err = r.app.Stop()
|
|
require.NoError(t, err)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
time.Sleep(1 * time.Second)
|
|
caughtUp := true
|
|
for _, r := range reactorPairs {
|
|
if !r.reactor.pool.IsCaughtUp() {
|
|
caughtUp = false
|
|
}
|
|
}
|
|
if caughtUp {
|
|
break
|
|
}
|
|
}
|
|
|
|
// at this time, reactors[0-3] is the newest
|
|
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
|
|
|
|
// Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data
|
|
// race, but can't be easily avoided.
|
|
reactorPairs[3].reactor.store = otherChain.reactor.store
|
|
|
|
lastReactorPair := newReactor(log.TestingLogger(), genDoc, privVals, 0)
|
|
reactorPairs = append(reactorPairs, lastReactorPair)
|
|
|
|
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
|
|
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
|
|
return s
|
|
|
|
}, p2p.Connect2Switches)...)
|
|
|
|
for i := 0; i < len(reactorPairs)-1; i++ {
|
|
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
|
|
}
|
|
|
|
for {
|
|
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
|
|
break
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
|
|
}
|
|
|
|
//----------------------------------------------
|
|
// utility funcs
|
|
|
|
func makeTxs(height int64) (txs []types.Tx) {
|
|
for i := 0; i < 10; i++ {
|
|
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
|
|
}
|
|
return txs
|
|
}
|
|
|
|
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
|
|
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
|
|
return block
|
|
}
|
|
|
|
type testApp struct {
|
|
abci.BaseApplication
|
|
}
|
|
|
|
var _ abci.Application = (*testApp)(nil)
|
|
|
|
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
|
|
return abci.ResponseInfo{}
|
|
}
|
|
|
|
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
|
|
return abci.ResponseBeginBlock{}
|
|
}
|
|
|
|
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
|
|
return abci.ResponseEndBlock{}
|
|
}
|
|
|
|
func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
|
|
return abci.ResponseDeliverTx{Events: []abci.Event{}}
|
|
}
|
|
|
|
func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
|
|
return abci.ResponseCheckTx{}
|
|
}
|
|
|
|
func (app *testApp) Commit() abci.ResponseCommit {
|
|
return abci.ResponseCommit{}
|
|
}
|
|
|
|
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
|
|
return
|
|
}
|