mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 13:55:17 +00:00
Merge pull request #2922 from tendermint/release/v0.26.4
Release/v0.26.4
This commit is contained in:
43
CHANGELOG.md
43
CHANGELOG.md
@@ -1,5 +1,48 @@
|
||||
# Changelog
|
||||
|
||||
## v0.26.4
|
||||
|
||||
*November 27th, 2018*
|
||||
|
||||
Special thanks to external contributors on this release:
|
||||
ackratos, goolAdapter, james-ray, joe-bowman, kostko,
|
||||
nagarajmanjunath, tomtau
|
||||
|
||||
|
||||
Friendly reminder, we have a [bug bounty
|
||||
program](https://hackerone.com/tendermint).
|
||||
|
||||
### FEATURES:
|
||||
|
||||
- [rpc] [\#2747](https://github.com/tendermint/tendermint/issues/2747) Enable subscription to tags emitted from `BeginBlock`/`EndBlock` (@kostko)
|
||||
- [types] [\#2747](https://github.com/tendermint/tendermint/issues/2747) Add `ResultBeginBlock` and `ResultEndBlock` fields to `EventDataNewBlock`
|
||||
and `EventDataNewBlockHeader` to support subscriptions (@kostko)
|
||||
- [types] [\#2918](https://github.com/tendermint/tendermint/issues/2918) Add Marshal, MarshalTo, Unmarshal methods to various structs
|
||||
to support Protobuf compatibility (@nagarajmanjunath)
|
||||
|
||||
### IMPROVEMENTS:
|
||||
|
||||
- [config] [\#2877](https://github.com/tendermint/tendermint/issues/2877) Add `blocktime_iota` to the config.toml (@ackratos)
|
||||
- NOTE: this should be a ConsensusParam, not part of the config, and will be
|
||||
removed from the config at a later date
|
||||
([\#2920](https://github.com/tendermint/tendermint/issues/2920).
|
||||
- [mempool] [\#2882](https://github.com/tendermint/tendermint/issues/2882) Add txs from Update to cache
|
||||
- [mempool] [\#2891](https://github.com/tendermint/tendermint/issues/2891) Remove local int64 counter from being stored in every tx
|
||||
- [node] [\#2866](https://github.com/tendermint/tendermint/issues/2866) Add ability to instantiate IPCVal (@joe-bowman)
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [blockchain] [\#2731](https://github.com/tendermint/tendermint/issues/2731) Retry both blocks if either is bad to avoid getting stuck during fast sync (@goolAdapter)
|
||||
- [consensus] [\#2893](https://github.com/tendermint/tendermint/issues/2893) Use genDoc.Validators instead of state.NextValidators on replay when appHeight==0 (@james-ray)
|
||||
- [log] [\#2868](https://github.com/tendermint/tendermint/issues/2868) Fix `module=main` setting overriding all others
|
||||
- NOTE: this changes the default logging behaviour to be much less verbose.
|
||||
Set `log_level="info"` to restore the previous behaviour.
|
||||
- [rpc] [\#2808](https://github.com/tendermint/tendermint/issues/2808) Fix `accum` field in `/validators` by calling `IncrementAccum` if necessary
|
||||
- [rpc] [\#2811](https://github.com/tendermint/tendermint/issues/2811) Allow integer IDs in JSON-RPC requests (@tomtau)
|
||||
- [txindex/kv] [\#2759](https://github.com/tendermint/tendermint/issues/2759) Fix tx.height range queries
|
||||
- [txindex/kv] [\#2775](https://github.com/tendermint/tendermint/issues/2775) Order tx results by index if height is the same
|
||||
- [txindex/kv] [\#2908](https://github.com/tendermint/tendermint/issues/2908) Don't return false positives when searching for a prefix of a tag value
|
||||
|
||||
## v0.26.3
|
||||
|
||||
*November 17th, 2018*
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Pending
|
||||
|
||||
## v0.26.4
|
||||
## v0.27.0
|
||||
|
||||
*TBD*
|
||||
|
||||
|
||||
@@ -168,9 +168,12 @@ func (pool *BlockPool) IsCaughtUp() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// some conditions to determine if we're caught up
|
||||
receivedBlockOrTimedOut := (pool.height > 0 || time.Since(pool.startTime) > 5*time.Second)
|
||||
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight
|
||||
// Some conditions to determine if we're caught up.
|
||||
// Ensures we've either received a block or waited some amount of time,
|
||||
// and that we're synced to the highest known height. Note we use maxPeerHeight - 1
|
||||
// because to sync block H requires block H+1 to verify the LastCommit.
|
||||
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
|
||||
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
|
||||
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
|
||||
return isCaughtUp
|
||||
}
|
||||
@@ -252,7 +255,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
|
||||
peer.decrPending(blockSize)
|
||||
}
|
||||
} else {
|
||||
// Bad peer?
|
||||
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
|
||||
pool.sendError(errors.New("invalid peer"), peerID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,7 +296,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
|
||||
func (pool *BlockPool) removePeer(peerID p2p.ID) {
|
||||
for _, requester := range pool.requesters {
|
||||
if requester.getPeerID() == peerID {
|
||||
requester.redo()
|
||||
requester.redo(peerID)
|
||||
}
|
||||
}
|
||||
delete(pool.peers, peerID)
|
||||
@@ -326,8 +330,11 @@ func (pool *BlockPool) makeNextRequester() {
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
nextHeight := pool.height + pool.requestersLen()
|
||||
if nextHeight > pool.maxPeerHeight {
|
||||
return
|
||||
}
|
||||
|
||||
request := newBPRequester(pool, nextHeight)
|
||||
// request.SetLogger(pool.Logger.With("height", nextHeight))
|
||||
|
||||
pool.requesters[nextHeight] = request
|
||||
atomic.AddInt32(&pool.numPending, 1)
|
||||
@@ -453,7 +460,7 @@ type bpRequester struct {
|
||||
pool *BlockPool
|
||||
height int64
|
||||
gotBlockCh chan struct{}
|
||||
redoCh chan struct{}
|
||||
redoCh chan p2p.ID //redo may send multitime, add peerId to identify repeat
|
||||
|
||||
mtx sync.Mutex
|
||||
peerID p2p.ID
|
||||
@@ -465,7 +472,7 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
|
||||
pool: pool,
|
||||
height: height,
|
||||
gotBlockCh: make(chan struct{}, 1),
|
||||
redoCh: make(chan struct{}, 1),
|
||||
redoCh: make(chan p2p.ID, 1),
|
||||
|
||||
peerID: "",
|
||||
block: nil,
|
||||
@@ -524,9 +531,9 @@ func (bpr *bpRequester) reset() {
|
||||
// Tells bpRequester to pick another peer and try again.
|
||||
// NOTE: Nonblocking, and does nothing if another redo
|
||||
// was already requested.
|
||||
func (bpr *bpRequester) redo() {
|
||||
func (bpr *bpRequester) redo(peerId p2p.ID) {
|
||||
select {
|
||||
case bpr.redoCh <- struct{}{}:
|
||||
case bpr.redoCh <- peerId:
|
||||
default:
|
||||
}
|
||||
}
|
||||
@@ -565,9 +572,13 @@ OUTER_LOOP:
|
||||
return
|
||||
case <-bpr.Quit():
|
||||
return
|
||||
case <-bpr.redoCh:
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP
|
||||
case peerID := <-bpr.redoCh:
|
||||
if peerID == bpr.peerID {
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP
|
||||
} else {
|
||||
continue WAIT_LOOP
|
||||
}
|
||||
case <-bpr.gotBlockCh:
|
||||
// We got a block!
|
||||
// Continue the for-loop and wait til Quit.
|
||||
|
||||
@@ -16,16 +16,52 @@ func init() {
|
||||
}
|
||||
|
||||
type testPeer struct {
|
||||
id p2p.ID
|
||||
height int64
|
||||
id p2p.ID
|
||||
height int64
|
||||
inputChan chan inputData //make sure each peer's data is sequential
|
||||
}
|
||||
|
||||
func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer {
|
||||
peers := make(map[p2p.ID]testPeer, numPeers)
|
||||
type inputData struct {
|
||||
t *testing.T
|
||||
pool *BlockPool
|
||||
request BlockRequest
|
||||
}
|
||||
|
||||
func (p testPeer) runInputRoutine() {
|
||||
go func() {
|
||||
for input := range p.inputChan {
|
||||
p.simulateInput(input)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Request desired, pretend like we got the block immediately.
|
||||
func (p testPeer) simulateInput(input inputData) {
|
||||
block := &types.Block{Header: types.Header{Height: input.request.Height}}
|
||||
input.pool.AddBlock(input.request.PeerID, block, 123)
|
||||
input.t.Logf("Added block from peer %v (height: %v)", input.request.PeerID, input.request.Height)
|
||||
}
|
||||
|
||||
type testPeers map[p2p.ID]testPeer
|
||||
|
||||
func (ps testPeers) start() {
|
||||
for _, v := range ps {
|
||||
v.runInputRoutine()
|
||||
}
|
||||
}
|
||||
|
||||
func (ps testPeers) stop() {
|
||||
for _, v := range ps {
|
||||
close(v.inputChan)
|
||||
}
|
||||
}
|
||||
|
||||
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
|
||||
peers := make(testPeers, numPeers)
|
||||
for i := 0; i < numPeers; i++ {
|
||||
peerID := p2p.ID(cmn.RandStr(12))
|
||||
height := minHeight + cmn.RandInt63n(maxHeight-minHeight)
|
||||
peers[peerID] = testPeer{peerID, height}
|
||||
peers[peerID] = testPeer{peerID, height, make(chan inputData, 10)}
|
||||
}
|
||||
return peers
|
||||
}
|
||||
@@ -45,6 +81,9 @@ func TestBasic(t *testing.T) {
|
||||
|
||||
defer pool.Stop()
|
||||
|
||||
peers.start()
|
||||
defer peers.stop()
|
||||
|
||||
// Introduce each peer.
|
||||
go func() {
|
||||
for _, peer := range peers {
|
||||
@@ -77,12 +116,8 @@ func TestBasic(t *testing.T) {
|
||||
if request.Height == 300 {
|
||||
return // Done!
|
||||
}
|
||||
// Request desired, pretend like we got the block immediately.
|
||||
go func() {
|
||||
block := &types.Block{Header: types.Header{Height: request.Height}}
|
||||
pool.AddBlock(request.PeerID, block, 123)
|
||||
t.Logf("Added block from peer %v (height: %v)", request.PeerID, request.Height)
|
||||
}()
|
||||
|
||||
peers[request.PeerID].inputChan <- inputData{t, pool, request}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,8 +264,12 @@ FOR_LOOP:
|
||||
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
|
||||
bcR.pool.Stop()
|
||||
|
||||
conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
|
||||
conR.SwitchToConsensus(state, blocksSynced)
|
||||
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
|
||||
if ok {
|
||||
conR.SwitchToConsensus(state, blocksSynced)
|
||||
} else {
|
||||
// should only happen during testing
|
||||
}
|
||||
|
||||
break FOR_LOOP
|
||||
}
|
||||
@@ -314,6 +318,13 @@ FOR_LOOP:
|
||||
// still need to clean up the rest.
|
||||
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err))
|
||||
}
|
||||
peerID2 := bcR.pool.RedoRequest(second.Height)
|
||||
peer2 := bcR.Switch.Peers().Get(peerID2)
|
||||
if peer2 != nil && peer2 != peer {
|
||||
// NOTE: we've already removed the peer's request, but we
|
||||
// still need to clean up the rest.
|
||||
bcR.Switch.StopPeerForError(peer2, fmt.Errorf("BlockchainReactor validation error: %v", err))
|
||||
}
|
||||
continue FOR_LOOP
|
||||
} else {
|
||||
bcR.pool.PopRequest()
|
||||
|
||||
@@ -1,72 +1,151 @@
|
||||
package blockchain
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
)
|
||||
|
||||
func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) {
|
||||
config := cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
// blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB())
|
||||
// stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB())
|
||||
var config *cfg.Config
|
||||
|
||||
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
|
||||
validators := make([]types.GenesisValidator, numValidators)
|
||||
privValidators := make([]types.PrivValidator, numValidators)
|
||||
for i := 0; i < numValidators; i++ {
|
||||
val, privVal := types.RandValidator(randPower, minPower)
|
||||
validators[i] = types.GenesisValidator{
|
||||
PubKey: val.PubKey,
|
||||
Power: val.VotingPower,
|
||||
}
|
||||
privValidators[i] = privVal
|
||||
}
|
||||
sort.Sort(types.PrivValidatorsByAddress(privValidators))
|
||||
|
||||
return &types.GenesisDoc{
|
||||
GenesisTime: tmtime.Now(),
|
||||
ChainID: config.ChainID(),
|
||||
Validators: validators,
|
||||
}, privValidators
|
||||
}
|
||||
|
||||
func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote {
|
||||
addr := privVal.GetAddress()
|
||||
idx, _ := valset.GetByAddress(addr)
|
||||
vote := &types.Vote{
|
||||
ValidatorAddress: addr,
|
||||
ValidatorIndex: idx,
|
||||
Height: header.Height,
|
||||
Round: 1,
|
||||
Timestamp: tmtime.Now(),
|
||||
Type: types.PrecommitType,
|
||||
BlockID: blockID,
|
||||
}
|
||||
|
||||
privVal.SignVote(header.ChainID, vote)
|
||||
|
||||
return vote
|
||||
}
|
||||
|
||||
type BlockchainReactorPair struct {
|
||||
reactor *BlockchainReactor
|
||||
app proxy.AppConns
|
||||
}
|
||||
|
||||
func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair {
|
||||
if len(privVals) != 1 {
|
||||
panic("only support one validator")
|
||||
}
|
||||
|
||||
app := &testApp{}
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
proxyApp := proxy.NewAppConns(cc)
|
||||
err := proxyApp.Start()
|
||||
if err != nil {
|
||||
panic(cmn.ErrorWrap(err, "error start app"))
|
||||
}
|
||||
|
||||
blockDB := dbm.NewMemDB()
|
||||
stateDB := dbm.NewMemDB()
|
||||
blockStore := NewBlockStore(blockDB)
|
||||
state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile())
|
||||
|
||||
state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
|
||||
if err != nil {
|
||||
panic(cmn.ErrorWrap(err, "error constructing state from genesis file"))
|
||||
}
|
||||
return state, blockStore
|
||||
}
|
||||
|
||||
func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainReactor {
|
||||
state, blockStore := makeStateAndBlockStore(logger)
|
||||
|
||||
// Make the blockchainReactor itself
|
||||
// Make the BlockchainReactor itself.
|
||||
// NOTE we have to create and commit the blocks first because
|
||||
// pool.height is determined from the store.
|
||||
fastSync := true
|
||||
var nilApp proxy.AppConnConsensus
|
||||
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp,
|
||||
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(),
|
||||
sm.MockMempool{}, sm.MockEvidencePool{})
|
||||
|
||||
// let's add some blocks in
|
||||
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
||||
lastCommit := &types.Commit{}
|
||||
if blockHeight > 1 {
|
||||
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
|
||||
lastBlock := blockStore.LoadBlock(blockHeight - 1)
|
||||
|
||||
vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0])
|
||||
lastCommit = &types.Commit{Precommits: []*types.Vote{vote}, BlockID: lastBlockMeta.BlockID}
|
||||
}
|
||||
|
||||
thisBlock := makeBlock(blockHeight, state, lastCommit)
|
||||
|
||||
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
|
||||
blockID := types.BlockID{thisBlock.Hash(), thisParts.Header()}
|
||||
|
||||
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
|
||||
if err != nil {
|
||||
panic(cmn.ErrorWrap(err, "error apply block"))
|
||||
}
|
||||
|
||||
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
|
||||
}
|
||||
|
||||
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
||||
|
||||
// Next: we need to set a switch in order for peers to be added in
|
||||
bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig(), nil)
|
||||
|
||||
// Lastly: let's add some blocks in
|
||||
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
|
||||
firstBlock := makeBlock(blockHeight, state)
|
||||
secondBlock := makeBlock(blockHeight+1, state)
|
||||
firstParts := firstBlock.MakePartSet(types.BlockPartSizeBytes)
|
||||
blockStore.SaveBlock(firstBlock, firstParts, secondBlock.LastCommit)
|
||||
}
|
||||
|
||||
return bcReactor
|
||||
return BlockchainReactorPair{bcReactor, proxyApp}
|
||||
}
|
||||
|
||||
func TestNoBlockResponse(t *testing.T) {
|
||||
maxBlockHeight := int64(20)
|
||||
config = cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||
|
||||
bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight)
|
||||
bcr.Start()
|
||||
defer bcr.Stop()
|
||||
maxBlockHeight := int64(65)
|
||||
|
||||
// Add some peers in
|
||||
peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12)))
|
||||
bcr.AddPeer(peer)
|
||||
reactorPairs := make([]BlockchainReactorPair, 2)
|
||||
|
||||
chID := byte(0x01)
|
||||
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
||||
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
|
||||
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
|
||||
defer func() {
|
||||
for _, r := range reactorPairs {
|
||||
r.reactor.Stop()
|
||||
r.app.Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
tests := []struct {
|
||||
height int64
|
||||
@@ -78,72 +157,100 @@ func TestNoBlockResponse(t *testing.T) {
|
||||
{100, false},
|
||||
}
|
||||
|
||||
// receive a request message from peer,
|
||||
// wait for our response to be received on the peer
|
||||
for _, tt := range tests {
|
||||
reqBlockMsg := &bcBlockRequestMessage{tt.height}
|
||||
reqBlockBytes := cdc.MustMarshalBinaryBare(reqBlockMsg)
|
||||
bcr.Receive(chID, peer, reqBlockBytes)
|
||||
msg := peer.lastBlockchainMessage()
|
||||
for {
|
||||
if reactorPairs[1].reactor.pool.IsCaughtUp() {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
|
||||
|
||||
for _, tt := range tests {
|
||||
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
|
||||
if tt.existent {
|
||||
if blockMsg, ok := msg.(*bcBlockResponseMessage); !ok {
|
||||
t.Fatalf("Expected to receive a block response for height %d", tt.height)
|
||||
} else if blockMsg.Block.Height != tt.height {
|
||||
t.Fatalf("Expected response to be for height %d, got %d", tt.height, blockMsg.Block.Height)
|
||||
}
|
||||
assert.True(t, block != nil)
|
||||
} else {
|
||||
if noBlockMsg, ok := msg.(*bcNoBlockResponseMessage); !ok {
|
||||
t.Fatalf("Expected to receive a no block response for height %d", tt.height)
|
||||
} else if noBlockMsg.Height != tt.height {
|
||||
t.Fatalf("Expected response to be for height %d, got %d", tt.height, noBlockMsg.Height)
|
||||
}
|
||||
assert.True(t, block == nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
// NOTE: This is too hard to test without
|
||||
// an easy way to add test peer to switch
|
||||
// or without significant refactoring of the module.
|
||||
// Alternatively we could actually dial a TCP conn but
|
||||
// that seems extreme.
|
||||
func TestBadBlockStopsPeer(t *testing.T) {
|
||||
maxBlockHeight := int64(20)
|
||||
config = cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||
|
||||
bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight)
|
||||
bcr.Start()
|
||||
defer bcr.Stop()
|
||||
maxBlockHeight := int64(148)
|
||||
|
||||
// Add some peers in
|
||||
peer := newbcrTestPeer(p2p.ID(cmn.RandStr(12)))
|
||||
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
||||
defer func() {
|
||||
otherChain.reactor.Stop()
|
||||
otherChain.app.Stop()
|
||||
}()
|
||||
|
||||
// XXX: This doesn't add the peer to anything,
|
||||
// so it's hard to check that it's later removed
|
||||
bcr.AddPeer(peer)
|
||||
assert.True(t, bcr.Switch.Peers().Size() > 0)
|
||||
reactorPairs := make([]BlockchainReactorPair, 4)
|
||||
|
||||
// send a bad block from the peer
|
||||
// default blocks already dont have commits, so should fail
|
||||
block := bcr.store.LoadBlock(3)
|
||||
msg := &bcBlockResponseMessage{Block: block}
|
||||
peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
|
||||
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
||||
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
|
||||
ticker := time.NewTicker(time.Millisecond * 10)
|
||||
timer := time.NewTimer(time.Second * 2)
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if bcr.Switch.Peers().Size() == 0 {
|
||||
break LOOP
|
||||
}
|
||||
case <-timer.C:
|
||||
t.Fatal("Timed out waiting to disconnect peer")
|
||||
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
|
||||
defer func() {
|
||||
for _, r := range reactorPairs {
|
||||
r.reactor.Stop()
|
||||
r.app.Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
if reactorPairs[3].reactor.pool.IsCaughtUp() {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
//at this time, reactors[0-3] is the newest
|
||||
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
|
||||
|
||||
//mark reactorPairs[3] is an invalid peer
|
||||
reactorPairs[3].reactor.store = otherChain.reactor.store
|
||||
|
||||
lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
|
||||
reactorPairs = append(reactorPairs, lastReactorPair)
|
||||
|
||||
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)...)
|
||||
|
||||
for i := 0; i < len(reactorPairs)-1; i++ {
|
||||
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
|
||||
}
|
||||
|
||||
for {
|
||||
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
|
||||
}
|
||||
*/
|
||||
|
||||
//----------------------------------------------
|
||||
// utility funcs
|
||||
@@ -155,56 +262,41 @@ func makeTxs(height int64) (txs []types.Tx) {
|
||||
return txs
|
||||
}
|
||||
|
||||
func makeBlock(height int64, state sm.State) *types.Block {
|
||||
block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit), nil, state.Validators.GetProposer().Address)
|
||||
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
|
||||
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
|
||||
return block
|
||||
}
|
||||
|
||||
// The Test peer
|
||||
type bcrTestPeer struct {
|
||||
cmn.BaseService
|
||||
id p2p.ID
|
||||
ch chan interface{}
|
||||
type testApp struct {
|
||||
abci.BaseApplication
|
||||
}
|
||||
|
||||
var _ p2p.Peer = (*bcrTestPeer)(nil)
|
||||
var _ abci.Application = (*testApp)(nil)
|
||||
|
||||
func newbcrTestPeer(id p2p.ID) *bcrTestPeer {
|
||||
bcr := &bcrTestPeer{
|
||||
id: id,
|
||||
ch: make(chan interface{}, 2),
|
||||
}
|
||||
bcr.BaseService = *cmn.NewBaseService(nil, "bcrTestPeer", bcr)
|
||||
return bcr
|
||||
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
|
||||
return abci.ResponseInfo{}
|
||||
}
|
||||
|
||||
func (tp *bcrTestPeer) lastBlockchainMessage() interface{} { return <-tp.ch }
|
||||
|
||||
func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool {
|
||||
var msg BlockchainMessage
|
||||
err := cdc.UnmarshalBinaryBare(msgBytes, &msg)
|
||||
if err != nil {
|
||||
panic(cmn.ErrorWrap(err, "Error while trying to parse a BlockchainMessage"))
|
||||
}
|
||||
if _, ok := msg.(*bcStatusResponseMessage); ok {
|
||||
// Discard status response messages since they skew our results
|
||||
// We only want to deal with:
|
||||
// + bcBlockResponseMessage
|
||||
// + bcNoBlockResponseMessage
|
||||
} else {
|
||||
tp.ch <- msg
|
||||
}
|
||||
return true
|
||||
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
|
||||
return abci.ResponseBeginBlock{}
|
||||
}
|
||||
|
||||
func (tp *bcrTestPeer) FlushStop() {}
|
||||
func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) }
|
||||
func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} }
|
||||
func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} }
|
||||
func (tp *bcrTestPeer) ID() p2p.ID { return tp.id }
|
||||
func (tp *bcrTestPeer) IsOutbound() bool { return false }
|
||||
func (tp *bcrTestPeer) IsPersistent() bool { return true }
|
||||
func (tp *bcrTestPeer) Get(s string) interface{} { return s }
|
||||
func (tp *bcrTestPeer) Set(string, interface{}) {}
|
||||
func (tp *bcrTestPeer) RemoteIP() net.IP { return []byte{127, 0, 0, 1} }
|
||||
func (tp *bcrTestPeer) OriginalAddr() *p2p.NetAddress { return nil }
|
||||
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
|
||||
return abci.ResponseEndBlock{}
|
||||
}
|
||||
|
||||
func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
|
||||
return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}}
|
||||
}
|
||||
|
||||
func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx {
|
||||
return abci.ResponseCheckTx{}
|
||||
}
|
||||
|
||||
func (app *testApp) Commit() abci.ResponseCommit {
|
||||
return abci.ResponseCommit{}
|
||||
}
|
||||
|
||||
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -9,13 +9,30 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/db"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
|
||||
"github.com/tendermint/tendermint/types"
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
)
|
||||
|
||||
func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) {
|
||||
config := cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
// blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB())
|
||||
// stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB())
|
||||
blockDB := dbm.NewMemDB()
|
||||
stateDB := dbm.NewMemDB()
|
||||
state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile())
|
||||
if err != nil {
|
||||
panic(cmn.ErrorWrap(err, "error constructing state from genesis file"))
|
||||
}
|
||||
return state, NewBlockStore(blockDB)
|
||||
}
|
||||
|
||||
func TestLoadBlockStoreStateJSON(t *testing.T) {
|
||||
db := db.NewMemDB()
|
||||
|
||||
@@ -65,7 +82,7 @@ func freshBlockStore() (*BlockStore, db.DB) {
|
||||
var (
|
||||
state, _ = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
|
||||
|
||||
block = makeBlock(1, state)
|
||||
block = makeBlock(1, state, new(types.Commit))
|
||||
partSet = block.MakePartSet(2)
|
||||
part1 = partSet.GetPart(0)
|
||||
part2 = partSet.GetPart(1)
|
||||
@@ -88,7 +105,7 @@ func TestBlockStoreSaveLoadBlock(t *testing.T) {
|
||||
}
|
||||
|
||||
// save a block
|
||||
block := makeBlock(bs.Height()+1, state)
|
||||
block := makeBlock(bs.Height()+1, state, new(types.Commit))
|
||||
validPartSet := block.MakePartSet(2)
|
||||
seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10,
|
||||
Timestamp: tmtime.Now()}}}
|
||||
@@ -331,7 +348,7 @@ func TestLoadBlockMeta(t *testing.T) {
|
||||
func TestBlockFetchAtHeight(t *testing.T) {
|
||||
state, bs := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
|
||||
require.Equal(t, bs.Height(), int64(0), "initially the height should be zero")
|
||||
block := makeBlock(bs.Height()+1, state)
|
||||
block := makeBlock(bs.Height()+1, state, new(types.Commit))
|
||||
|
||||
partSet := block.MakePartSet(2)
|
||||
seenCommit := &types.Commit{Precommits: []*types.Vote{{Height: 10,
|
||||
|
||||
@@ -260,6 +260,9 @@ create_empty_blocks_interval = "{{ .Consensus.CreateEmptyBlocksInterval }}"
|
||||
peer_gossip_sleep_duration = "{{ .Consensus.PeerGossipSleepDuration }}"
|
||||
peer_query_maj23_sleep_duration = "{{ .Consensus.PeerQueryMaj23SleepDuration }}"
|
||||
|
||||
# Block time parameters. Corresponds to the minimum time increment between consecutive blocks.
|
||||
blocktime_iota = "{{ .Consensus.BlockTimeIota }}"
|
||||
|
||||
##### transactions indexer configuration options #####
|
||||
[tx_index]
|
||||
|
||||
|
||||
@@ -72,18 +72,18 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
|
||||
startTestRound(cs, height, round)
|
||||
|
||||
ensureNewRound(newRoundCh, height, round) // first round at first height
|
||||
ensureNewEventOnChannel(newBlockCh) // first block gets committed
|
||||
ensureNewEventOnChannel(newBlockCh) // first block gets committed
|
||||
|
||||
height = height + 1 // moving to the next height
|
||||
round = 0
|
||||
|
||||
ensureNewRound(newRoundCh, height, round) // first round at next height
|
||||
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
|
||||
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
|
||||
ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds())
|
||||
|
||||
round = round + 1 // moving to the next round
|
||||
round = round + 1 // moving to the next round
|
||||
ensureNewRound(newRoundCh, height, round) // wait for the next round
|
||||
ensureNewEventOnChannel(newBlockCh) // now we can commit the block
|
||||
ensureNewEventOnChannel(newBlockCh) // now we can commit the block
|
||||
}
|
||||
|
||||
func deliverTxsRange(cs *ConsensusState, start, end int) {
|
||||
|
||||
@@ -276,7 +276,12 @@ func (h *Handshaker) ReplayBlocks(
|
||||
|
||||
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain.
|
||||
if appBlockHeight == 0 {
|
||||
nextVals := types.TM2PB.ValidatorUpdates(state.NextValidators) // state.Validators would work too.
|
||||
validators := make([]*types.Validator, len(h.genDoc.Validators))
|
||||
for i, val := range h.genDoc.Validators {
|
||||
validators[i] = types.NewValidator(val.PubKey, val.Power)
|
||||
}
|
||||
validatorSet := types.NewValidatorSet(validators)
|
||||
nextVals := types.TM2PB.ValidatorUpdates(validatorSet)
|
||||
csParams := types.TM2PB.ConsensusParams(h.genDoc.ConsensusParams)
|
||||
req := abci.RequestInitChain{
|
||||
Time: h.genDoc.GenesisTime,
|
||||
|
||||
@@ -315,28 +315,21 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
|
||||
config := ResetConfig("proxy_test_")
|
||||
|
||||
walBody, err := WALWithNBlocks(NUM_BLOCKS)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
walFile := tempWALWithData(walBody)
|
||||
config.Consensus.SetWalFile(walFile)
|
||||
|
||||
privVal := privval.LoadFilePV(config.PrivValidatorFile())
|
||||
|
||||
wal, err := NewWAL(walFile)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
wal.SetLogger(log.TestingLogger())
|
||||
if err := wal.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = wal.Start()
|
||||
require.NoError(t, err)
|
||||
defer wal.Stop()
|
||||
|
||||
chain, commits, err := makeBlockchainFromWAL(wal)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion)
|
||||
store.chain = chain
|
||||
|
||||
@@ -324,10 +324,11 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
|
||||
go cs.receiveRoutine(maxSteps)
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish.
|
||||
// OnStop implements cmn.Service.
|
||||
func (cs *ConsensusState) OnStop() {
|
||||
cs.evsw.Stop()
|
||||
cs.timeoutTicker.Stop()
|
||||
// WAL is stopped in receiveRoutine.
|
||||
}
|
||||
|
||||
// Wait waits for the the main routine to return.
|
||||
|
||||
@@ -1043,7 +1043,7 @@ func TestNoHearbeatWhenNotValidator(t *testing.T) {
|
||||
cs.Stop()
|
||||
|
||||
// if a faulty implementation sends an event, we should wait here a little bit to make sure we don't miss it by prematurely leaving the test method
|
||||
time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second)
|
||||
time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second)
|
||||
}
|
||||
|
||||
// regression for #2518
|
||||
|
||||
@@ -55,3 +55,31 @@ func (prs PeerRoundState) StringIndented(indent string) string {
|
||||
indent, prs.CatchupCommit, prs.CatchupCommitRound,
|
||||
indent)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------
|
||||
// These methods are for Protobuf Compatibility
|
||||
|
||||
// Size returns the size of the amino encoding, in bytes.
|
||||
func (ps *PeerRoundState) Size() int {
|
||||
bs, _ := ps.Marshal()
|
||||
return len(bs)
|
||||
}
|
||||
|
||||
// Marshal returns the amino encoding.
|
||||
func (ps *PeerRoundState) Marshal() ([]byte, error) {
|
||||
return cdc.MarshalBinaryBare(ps)
|
||||
}
|
||||
|
||||
// MarshalTo calls Marshal and copies to the given buffer.
|
||||
func (ps *PeerRoundState) MarshalTo(data []byte) (int, error) {
|
||||
bs, err := ps.Marshal()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return copy(data, bs), nil
|
||||
}
|
||||
|
||||
// Unmarshal deserializes from amino encoded form.
|
||||
func (ps *PeerRoundState) Unmarshal(bs []byte) error {
|
||||
return cdc.UnmarshalBinaryBare(bs, ps)
|
||||
}
|
||||
|
||||
@@ -201,3 +201,31 @@ func (rs *RoundState) StringShort() string {
|
||||
return fmt.Sprintf(`RoundState{H:%v R:%v S:%v ST:%v}`,
|
||||
rs.Height, rs.Round, rs.Step, rs.StartTime)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------
|
||||
// These methods are for Protobuf Compatibility
|
||||
|
||||
// Size returns the size of the amino encoding, in bytes.
|
||||
func (rs *RoundStateSimple) Size() int {
|
||||
bs, _ := rs.Marshal()
|
||||
return len(bs)
|
||||
}
|
||||
|
||||
// Marshal returns the amino encoding.
|
||||
func (rs *RoundStateSimple) Marshal() ([]byte, error) {
|
||||
return cdc.MarshalBinaryBare(rs)
|
||||
}
|
||||
|
||||
// MarshalTo calls Marshal and copies to the given buffer.
|
||||
func (rs *RoundStateSimple) MarshalTo(data []byte) (int, error) {
|
||||
bs, err := rs.Marshal()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return copy(data, bs), nil
|
||||
}
|
||||
|
||||
// Unmarshal deserializes from amino encoded form.
|
||||
func (rs *RoundStateSimple) Unmarshal(bs []byte) error {
|
||||
return cdc.UnmarshalBinaryBare(bs, rs)
|
||||
}
|
||||
|
||||
@@ -7,13 +7,13 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
// "sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/libs/autofile"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
|
||||
@@ -23,29 +23,27 @@ import (
|
||||
|
||||
func TestWALTruncate(t *testing.T) {
|
||||
walDir, err := ioutil.TempDir("", "wal")
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to create temp WAL file: %v", err))
|
||||
}
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(walDir)
|
||||
|
||||
walFile := filepath.Join(walDir, "wal")
|
||||
|
||||
//this magic number 4K can truncate the content when RotateFile. defaultHeadSizeLimit(10M) is hard to simulate.
|
||||
//this magic number 1 * time.Millisecond make RotateFile check frequently. defaultGroupCheckDuration(5s) is hard to simulate.
|
||||
wal, err := NewWAL(walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
wal.Start()
|
||||
wal, err := NewWAL(walFile,
|
||||
autofile.GroupHeadSizeLimit(4096),
|
||||
autofile.GroupCheckDuration(1*time.Millisecond),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
wal.SetLogger(log.TestingLogger())
|
||||
err = wal.Start()
|
||||
require.NoError(t, err)
|
||||
defer wal.Stop()
|
||||
|
||||
//60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file.
|
||||
//at this time, RotateFile is called, truncate content exist in each file.
|
||||
err = WALGenerateNBlocks(wal.Group(), 60)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(1 * time.Millisecond) //wait groupCheckDuration, make sure RotateFile run
|
||||
|
||||
@@ -99,9 +97,8 @@ func TestWALSearchForEndHeight(t *testing.T) {
|
||||
walFile := tempWALWithData(walBody)
|
||||
|
||||
wal, err := NewWAL(walFile)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
wal.SetLogger(log.TestingLogger())
|
||||
|
||||
h := int64(3)
|
||||
gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{})
|
||||
|
||||
@@ -5,14 +5,17 @@ implementations:
|
||||
|
||||
- FilePV uses an unencrypted private key in a "priv_validator.json" file - no
|
||||
configuration required (just `tendermint init`).
|
||||
- SocketPV uses a socket to send signing requests to another process - user is
|
||||
responsible for starting that process themselves.
|
||||
- TCPVal and IPCVal use TCP and Unix sockets respectively to send signing requests
|
||||
to another process - the user is responsible for starting that process themselves.
|
||||
|
||||
The SocketPV address can be provided via flags at the command line - doing so
|
||||
will cause Tendermint to ignore any "priv_validator.json" file and to listen on
|
||||
the given address for incoming connections from an external priv_validator
|
||||
process. It will halt any operation until at least one external process
|
||||
succesfully connected.
|
||||
Both TCPVal and IPCVal addresses can be provided via flags at the command line
|
||||
or in the configuration file; TCPVal addresses must be of the form
|
||||
`tcp://<ip_address>:<port>` and IPCVal addresses `unix:///path/to/file.sock` -
|
||||
doing so will cause Tendermint to ignore any private validator files.
|
||||
|
||||
TCPVal will listen on the given address for incoming connections from an external
|
||||
private validator process. It will halt any operation until at least one external
|
||||
process successfully connected.
|
||||
|
||||
The external priv_validator process will dial the address to connect to
|
||||
Tendermint, and then Tendermint will send requests on the ensuing connection to
|
||||
@@ -21,6 +24,9 @@ but the Tendermint process makes all requests. In a later stage we're going to
|
||||
support multiple validators for fault tolerance. To prevent double signing they
|
||||
need to be synced, which is deferred to an external solution (see #1185).
|
||||
|
||||
Conversely, IPCVal will make an outbound connection to an existing socket opened
|
||||
by the external validator process.
|
||||
|
||||
In addition, Tendermint will provide implementations that can be run in that
|
||||
external process. These include:
|
||||
|
||||
|
||||
@@ -95,9 +95,9 @@ wget https://github.com/google/leveldb/archive/v1.20.tar.gz && \
|
||||
tar -zxvf v1.20.tar.gz && \
|
||||
cd leveldb-1.20/ && \
|
||||
make && \
|
||||
cp -r out-static/lib* out-shared/lib* /usr/local/lib/ && \
|
||||
sudo cp -r out-static/lib* out-shared/lib* /usr/local/lib/ && \
|
||||
cd include/ && \
|
||||
cp -r leveldb /usr/local/include/ && \
|
||||
sudo cp -r leveldb /usr/local/include/ && \
|
||||
sudo ldconfig && \
|
||||
rm -f v1.20.tar.gz
|
||||
```
|
||||
@@ -109,8 +109,8 @@ Set database backend to cleveldb:
|
||||
db_backend = "cleveldb"
|
||||
```
|
||||
|
||||
To build Tendermint, run
|
||||
To install Tendermint, run
|
||||
|
||||
```
|
||||
CGO_LDFLAGS="-lsnappy" go build -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`git rev-parse --short=8 HEAD`" -tags "tendermint gcc" -o build/tendermint ./cmd/tendermint/
|
||||
CGO_LDFLAGS="-lsnappy" go install -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`git rev-parse --short=8 HEAD`" -tags "tendermint gcc" -o build/tendermint ./cmd/tendermint/
|
||||
```
|
||||
|
||||
@@ -45,7 +45,9 @@ include a `Tags` field in their `Response*`. Each tag is key-value pair denoting
|
||||
something about what happened during the methods execution.
|
||||
|
||||
Tags can be used to index transactions and blocks according to what happened
|
||||
during their execution.
|
||||
during their execution. Note that the set of tags returned for a block from
|
||||
`BeginBlock` and `EndBlock` are merged. In case both methods return the same
|
||||
tag, only the value defined in `EndBlock` is used.
|
||||
|
||||
Keys and values in tags must be UTF-8 encoded strings (e.g.
|
||||
"account.owner": "Bob", "balance": "100.0",
|
||||
|
||||
@@ -59,22 +59,14 @@ You can simply use below table and concatenate Prefix || Length (of raw bytes) |
|
||||
| PubKeySecp256k1 | tendermint/PubKeySecp256k1 | 0xEB5AE987 | 0x21 | |
|
||||
| PrivKeyEd25519 | tendermint/PrivKeyEd25519 | 0xA3288910 | 0x40 | |
|
||||
| PrivKeySecp256k1 | tendermint/PrivKeySecp256k1 | 0xE1B0F79B | 0x20 | |
|
||||
| SignatureEd25519 | tendermint/SignatureEd25519 | 0x2031EA53 | 0x40 | |
|
||||
| SignatureSecp256k1 | tendermint/SignatureSecp256k1 | 0x7FC4A495 | variable |
|
||||
| PubKeyMultisigThreshold | tendermint/PubKeyMultisigThreshold | 0x22C1F7E2 | variable | |
|
||||
|
||||
|
|
||||
### Example
|
||||
|
||||
### Examples
|
||||
|
||||
1. For example, the 33-byte (or 0x21-byte in hex) Secp256k1 pubkey
|
||||
For example, the 33-byte (or 0x21-byte in hex) Secp256k1 pubkey
|
||||
`020BD40F225A57ED383B440CF073BC5539D0341F5767D2BF2D78406D00475A2EE9`
|
||||
would be encoded as
|
||||
`EB5AE98221020BD40F225A57ED383B440CF073BC5539D0341F5767D2BF2D78406D00475A2EE9`
|
||||
|
||||
2. For example, the variable size Secp256k1 signature (in this particular example 70 or 0x46 bytes)
|
||||
`304402201CD4B8C764D2FD8AF23ECFE6666CA8A53886D47754D951295D2D311E1FEA33BF02201E0F906BB1CF2C30EAACFFB032A7129358AFF96B9F79B06ACFFB18AC90C2ADD7`
|
||||
would be encoded as
|
||||
`16E1FEEA46304402201CD4B8C764D2FD8AF23ECFE6666CA8A53886D47754D951295D2D311E1FEA33BF02201E0F906BB1CF2C30EAACFFB032A7129358AFF96B9F79B06ACFFB18AC90C2ADD7`
|
||||
`EB5AE98721020BD40F225A57ED383B440CF073BC5539D0341F5767D2BF2D78406D00475A2EE9`
|
||||
|
||||
### Addresses
|
||||
|
||||
|
||||
@@ -65,24 +65,24 @@ type Requester {
|
||||
mtx Mutex
|
||||
block Block
|
||||
height int64
|
||||
peerID p2p.ID
|
||||
redoChannel chan struct{}
|
||||
peerID p2p.ID
|
||||
redoChannel chan p2p.ID //redo may send multi-time; peerId is used to identify repeat
|
||||
}
|
||||
```
|
||||
|
||||
Pool is core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc.
|
||||
Pool is a core data structure that stores last executed block (`height`), assignment of requests to peers (`requesters`), current height for each peer and number of pending requests for each peer (`peers`), maximum peer height, etc.
|
||||
|
||||
```go
|
||||
type Pool {
|
||||
mtx Mutex
|
||||
requesters map[int64]*Requester
|
||||
height int64
|
||||
peers map[p2p.ID]*Peer
|
||||
maxPeerHeight int64
|
||||
numPending int32
|
||||
store BlockStore
|
||||
requestsChannel chan<- BlockRequest
|
||||
errorsChannel chan<- peerError
|
||||
mtx Mutex
|
||||
requesters map[int64]*Requester
|
||||
height int64
|
||||
peers map[p2p.ID]*Peer
|
||||
maxPeerHeight int64
|
||||
numPending int32
|
||||
store BlockStore
|
||||
requestsChannel chan<- BlockRequest
|
||||
errorsChannel chan<- peerError
|
||||
}
|
||||
```
|
||||
|
||||
@@ -90,11 +90,11 @@ Peer data structure stores for each peer current `height` and number of pending
|
||||
|
||||
```go
|
||||
type Peer struct {
|
||||
id p2p.ID
|
||||
height int64
|
||||
numPending int32
|
||||
timeout *time.Timer
|
||||
didTimeout bool
|
||||
id p2p.ID
|
||||
height int64
|
||||
numPending int32
|
||||
timeout *time.Timer
|
||||
didTimeout bool
|
||||
}
|
||||
```
|
||||
|
||||
@@ -169,11 +169,11 @@ Requester task is responsible for fetching a single block at position `height`.
|
||||
|
||||
```go
|
||||
fetchBlock(height, pool):
|
||||
while true do
|
||||
while true do {
|
||||
peerID = nil
|
||||
block = nil
|
||||
peer = pickAvailablePeer(height)
|
||||
peerId = peer.id
|
||||
peerID = peer.id
|
||||
|
||||
enqueue BlockRequest(height, peerID) to pool.requestsChannel
|
||||
redo = false
|
||||
@@ -181,12 +181,15 @@ fetchBlock(height, pool):
|
||||
select {
|
||||
upon receiving Quit message do
|
||||
return
|
||||
upon receiving message on redoChannel do
|
||||
mtx.Lock()
|
||||
pool.numPending++
|
||||
redo = true
|
||||
mtx.UnLock()
|
||||
upon receiving redo message with id on redoChannel do
|
||||
if peerID == id {
|
||||
mtx.Lock()
|
||||
pool.numPending++
|
||||
redo = true
|
||||
mtx.UnLock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pickAvailablePeer(height):
|
||||
selectedPeer = nil
|
||||
@@ -244,7 +247,7 @@ createRequesters(pool):
|
||||
main(pool):
|
||||
create trySyncTicker with interval trySyncIntervalMS
|
||||
create statusUpdateTicker with interval statusUpdateIntervalSeconds
|
||||
create switchToConsensusTicker with interbal switchToConsensusIntervalSeconds
|
||||
create switchToConsensusTicker with interval switchToConsensusIntervalSeconds
|
||||
|
||||
while true do
|
||||
select {
|
||||
|
||||
@@ -203,6 +203,9 @@ create_empty_blocks_interval = "0s"
|
||||
peer_gossip_sleep_duration = "100ms"
|
||||
peer_query_maj23_sleep_duration = "2000ms"
|
||||
|
||||
# Block time parameters. Corresponds to the minimum time increment between consecutive blocks.
|
||||
blocktime_iota = "1000ms"
|
||||
|
||||
##### transactions indexer configuration options #####
|
||||
[tx_index]
|
||||
|
||||
|
||||
@@ -163,7 +163,7 @@ func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evid
|
||||
// make sure the peer is up to date
|
||||
evHeight := ev.Height()
|
||||
peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
|
||||
if !ok {
|
||||
if !ok {
|
||||
// Peer does not have a state yet. We set it in the consensus reactor, but
|
||||
// when we add peer in Switch, the order we call reactors#AddPeer is
|
||||
// different every time due to us using a map. Sometimes other reactors
|
||||
|
||||
@@ -119,4 +119,4 @@ func TestAutoFileSize(t *testing.T) {
|
||||
|
||||
// Cleanup
|
||||
_ = os.Remove(f.Name())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestParseLogLevel(t *testing.T) {
|
||||
|
||||
buf.Reset()
|
||||
|
||||
logger.With("module", "wire").Debug("Kingpin")
|
||||
logger.With("module", "mempool").With("module", "wire").Debug("Kingpin")
|
||||
if have := strings.TrimSpace(buf.String()); c.expectedLogLines[0] != have {
|
||||
t.Errorf("\nwant '%s'\nhave '%s'\nlevel '%s'", c.expectedLogLines[0], have, c.lvl)
|
||||
}
|
||||
|
||||
@@ -11,9 +11,10 @@ const (
|
||||
)
|
||||
|
||||
type filter struct {
|
||||
next Logger
|
||||
allowed level // XOR'd levels for default case
|
||||
allowedKeyvals map[keyval]level // When key-value match, use this level
|
||||
next Logger
|
||||
allowed level // XOR'd levels for default case
|
||||
initiallyAllowed level // XOR'd levels for initial case
|
||||
allowedKeyvals map[keyval]level // When key-value match, use this level
|
||||
}
|
||||
|
||||
type keyval struct {
|
||||
@@ -33,6 +34,7 @@ func NewFilter(next Logger, options ...Option) Logger {
|
||||
for _, option := range options {
|
||||
option(l)
|
||||
}
|
||||
l.initiallyAllowed = l.allowed
|
||||
return l
|
||||
}
|
||||
|
||||
@@ -76,14 +78,45 @@ func (l *filter) Error(msg string, keyvals ...interface{}) {
|
||||
// logger = log.NewFilter(logger, log.AllowError(), log.AllowInfoWith("module", "crypto"), log.AllowNoneWith("user", "Sam"))
|
||||
// logger.With("user", "Sam").With("module", "crypto").Info("Hello") # produces "I... Hello module=crypto user=Sam"
|
||||
func (l *filter) With(keyvals ...interface{}) Logger {
|
||||
keyInAllowedKeyvals := false
|
||||
|
||||
for i := len(keyvals) - 2; i >= 0; i -= 2 {
|
||||
for kv, allowed := range l.allowedKeyvals {
|
||||
if keyvals[i] == kv.key && keyvals[i+1] == kv.value {
|
||||
return &filter{next: l.next.With(keyvals...), allowed: allowed, allowedKeyvals: l.allowedKeyvals}
|
||||
if keyvals[i] == kv.key {
|
||||
keyInAllowedKeyvals = true
|
||||
// Example:
|
||||
// logger = log.NewFilter(logger, log.AllowError(), log.AllowInfoWith("module", "crypto"))
|
||||
// logger.With("module", "crypto")
|
||||
if keyvals[i+1] == kv.value {
|
||||
return &filter{
|
||||
next: l.next.With(keyvals...),
|
||||
allowed: allowed, // set the desired level
|
||||
allowedKeyvals: l.allowedKeyvals,
|
||||
initiallyAllowed: l.initiallyAllowed,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return &filter{next: l.next.With(keyvals...), allowed: l.allowed, allowedKeyvals: l.allowedKeyvals}
|
||||
|
||||
// Example:
|
||||
// logger = log.NewFilter(logger, log.AllowError(), log.AllowInfoWith("module", "crypto"))
|
||||
// logger.With("module", "main")
|
||||
if keyInAllowedKeyvals {
|
||||
return &filter{
|
||||
next: l.next.With(keyvals...),
|
||||
allowed: l.initiallyAllowed, // return back to initially allowed
|
||||
allowedKeyvals: l.allowedKeyvals,
|
||||
initiallyAllowed: l.initiallyAllowed,
|
||||
}
|
||||
}
|
||||
|
||||
return &filter{
|
||||
next: l.next.With(keyvals...),
|
||||
allowed: l.allowed, // simply continue with the current level
|
||||
allowedKeyvals: l.allowedKeyvals,
|
||||
initiallyAllowed: l.initiallyAllowed,
|
||||
}
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
@@ -131,7 +131,6 @@ type Mempool struct {
|
||||
proxyMtx sync.Mutex
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
counter int64 // simple incrementing counter
|
||||
height int64 // the last block Update()'d to
|
||||
rechecking int32 // for re-checking filtered txs on Update()
|
||||
recheckCursor *clist.CElement // next expected response
|
||||
@@ -167,7 +166,6 @@ func NewMempool(
|
||||
config: config,
|
||||
proxyAppConn: proxyAppConn,
|
||||
txs: clist.New(),
|
||||
counter: 0,
|
||||
height: height,
|
||||
rechecking: 0,
|
||||
recheckCursor: nil,
|
||||
@@ -365,9 +363,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
|
||||
postCheckErr = mem.postCheck(tx, r.CheckTx)
|
||||
}
|
||||
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
|
||||
mem.counter++
|
||||
memTx := &mempoolTx{
|
||||
counter: mem.counter,
|
||||
height: mem.height,
|
||||
gasWanted: r.CheckTx.GasWanted,
|
||||
tx: tx,
|
||||
@@ -378,7 +374,6 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
|
||||
"res", r,
|
||||
"height", memTx.height,
|
||||
"total", mem.Size(),
|
||||
"counter", memTx.counter,
|
||||
)
|
||||
mem.metrics.TxSizeBytes.Observe(float64(len(tx)))
|
||||
mem.notifyTxsAvailable()
|
||||
@@ -534,12 +529,6 @@ func (mem *Mempool) Update(
|
||||
preCheck PreCheckFunc,
|
||||
postCheck PostCheckFunc,
|
||||
) error {
|
||||
// First, create a lookup map of txns in new txs.
|
||||
txsMap := make(map[string]struct{}, len(txs))
|
||||
for _, tx := range txs {
|
||||
txsMap[string(tx)] = struct{}{}
|
||||
}
|
||||
|
||||
// Set height
|
||||
mem.height = height
|
||||
mem.notifiedTxsAvailable = false
|
||||
@@ -551,12 +540,18 @@ func (mem *Mempool) Update(
|
||||
mem.postCheck = postCheck
|
||||
}
|
||||
|
||||
// Remove transactions that are already in txs.
|
||||
goodTxs := mem.filterTxs(txsMap)
|
||||
// Add committed transactions to cache (if missing).
|
||||
for _, tx := range txs {
|
||||
_ = mem.cache.Push(tx)
|
||||
}
|
||||
|
||||
// Remove committed transactions.
|
||||
txsLeft := mem.removeTxs(txs)
|
||||
|
||||
// Recheck mempool txs if any txs were committed in the block
|
||||
if mem.config.Recheck && len(goodTxs) > 0 {
|
||||
mem.logger.Info("Recheck txs", "numtxs", len(goodTxs), "height", height)
|
||||
mem.recheckTxs(goodTxs)
|
||||
if mem.config.Recheck && len(txsLeft) > 0 {
|
||||
mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height)
|
||||
mem.recheckTxs(txsLeft)
|
||||
// At this point, mem.txs are being rechecked.
|
||||
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
|
||||
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
|
||||
@@ -568,12 +563,18 @@ func (mem *Mempool) Update(
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx {
|
||||
goodTxs := make([]types.Tx, 0, mem.txs.Len())
|
||||
func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx {
|
||||
// Build a map for faster lookups.
|
||||
txsMap := make(map[string]struct{}, len(txs))
|
||||
for _, tx := range txs {
|
||||
txsMap[string(tx)] = struct{}{}
|
||||
}
|
||||
|
||||
txsLeft := make([]types.Tx, 0, mem.txs.Len())
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
// Remove the tx if it's alredy in a block.
|
||||
if _, ok := blockTxsMap[string(memTx.tx)]; ok {
|
||||
// Remove the tx if it's already in a block.
|
||||
if _, ok := txsMap[string(memTx.tx)]; ok {
|
||||
// remove from clist
|
||||
mem.txs.Remove(e)
|
||||
e.DetachPrev()
|
||||
@@ -581,15 +582,14 @@ func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx {
|
||||
// NOTE: we don't remove committed txs from the cache.
|
||||
continue
|
||||
}
|
||||
// Good tx!
|
||||
goodTxs = append(goodTxs, memTx.tx)
|
||||
txsLeft = append(txsLeft, memTx.tx)
|
||||
}
|
||||
return goodTxs
|
||||
return txsLeft
|
||||
}
|
||||
|
||||
// NOTE: pass in goodTxs because mem.txs can mutate concurrently.
|
||||
func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
|
||||
if len(goodTxs) == 0 {
|
||||
// NOTE: pass in txs because mem.txs can mutate concurrently.
|
||||
func (mem *Mempool) recheckTxs(txs []types.Tx) {
|
||||
if len(txs) == 0 {
|
||||
return
|
||||
}
|
||||
atomic.StoreInt32(&mem.rechecking, 1)
|
||||
@@ -598,7 +598,7 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
|
||||
|
||||
// Push txs to proxyAppConn
|
||||
// NOTE: resCb() may be called concurrently.
|
||||
for _, tx := range goodTxs {
|
||||
for _, tx := range txs {
|
||||
mem.proxyAppConn.CheckTxAsync(tx)
|
||||
}
|
||||
mem.proxyAppConn.FlushAsync()
|
||||
@@ -608,7 +608,6 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) {
|
||||
|
||||
// mempoolTx is a transaction that successfully ran
|
||||
type mempoolTx struct {
|
||||
counter int64 // a simple incrementing counter
|
||||
height int64 // height that this tx had been validated in
|
||||
gasWanted int64 // amount of gas this tx states it will require
|
||||
tx types.Tx //
|
||||
|
||||
@@ -163,6 +163,17 @@ func TestMempoolFilters(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMempoolUpdateAddsTxsToCache(t *testing.T) {
|
||||
app := kvstore.NewKVStoreApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
mempool := newMempoolWithApp(cc)
|
||||
mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil)
|
||||
err := mempool.CheckTx([]byte{0x01}, nil)
|
||||
if assert.Error(t, err) {
|
||||
assert.Equal(t, ErrTxInCache, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxsAvailable(t *testing.T) {
|
||||
app := kvstore.NewKVStoreApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
|
||||
63
node/node.go
63
node/node.go
@@ -3,7 +3,6 @@ package node
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -11,11 +10,12 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rs/cors"
|
||||
|
||||
"github.com/tendermint/go-amino"
|
||||
amino "github.com/tendermint/go-amino"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
@@ -220,25 +220,14 @@ func NewNode(config *cfg.Config,
|
||||
)
|
||||
}
|
||||
|
||||
// If an address is provided, listen on the socket for a
|
||||
// connection from an external signing process.
|
||||
if config.PrivValidatorListenAddr != "" {
|
||||
var (
|
||||
// TODO: persist this key so external signer
|
||||
// can actually authenticate us
|
||||
privKey = ed25519.GenPrivKey()
|
||||
pvsc = privval.NewTCPVal(
|
||||
logger.With("module", "privval"),
|
||||
config.PrivValidatorListenAddr,
|
||||
privKey,
|
||||
)
|
||||
)
|
||||
|
||||
if err := pvsc.Start(); err != nil {
|
||||
return nil, fmt.Errorf("Error starting private validator client: %v", err)
|
||||
// If an address is provided, listen on the socket for a connection from an
|
||||
// external signing process.
|
||||
// FIXME: we should start services inside OnStart
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Error with private validator socket client")
|
||||
}
|
||||
|
||||
privValidator = pvsc
|
||||
}
|
||||
|
||||
// Decide whether to fast-sync or not
|
||||
@@ -600,10 +589,8 @@ func (n *Node) OnStop() {
|
||||
}
|
||||
}
|
||||
|
||||
if pvsc, ok := n.privValidator.(*privval.TCPVal); ok {
|
||||
if err := pvsc.Stop(); err != nil {
|
||||
n.Logger.Error("Error stopping priv validator socket client", "err", err)
|
||||
}
|
||||
if pvsc, ok := n.privValidator.(cmn.Service); ok {
|
||||
pvsc.Stop()
|
||||
}
|
||||
|
||||
if n.prometheusSrv != nil {
|
||||
@@ -857,6 +844,36 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
|
||||
db.SetSync(genesisDocKey, bytes)
|
||||
}
|
||||
|
||||
func createAndStartPrivValidatorSocketClient(
|
||||
listenAddr string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
var pvsc types.PrivValidator
|
||||
|
||||
protocol, address := cmn.ProtocolAndAddress(listenAddr)
|
||||
switch protocol {
|
||||
case "unix":
|
||||
pvsc = privval.NewIPCVal(logger.With("module", "privval"), address)
|
||||
case "tcp":
|
||||
// TODO: persist this key so external signer
|
||||
// can actually authenticate us
|
||||
pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey())
|
||||
default:
|
||||
return nil, fmt.Errorf(
|
||||
"Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
|
||||
protocol,
|
||||
)
|
||||
}
|
||||
|
||||
if pvsc, ok := pvsc.(cmn.Service); ok {
|
||||
if err := pvsc.Start(); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to start")
|
||||
}
|
||||
}
|
||||
|
||||
return pvsc, nil
|
||||
}
|
||||
|
||||
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
|
||||
// slice of the string s with all leading and trailing Unicode code points
|
||||
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
|
||||
|
||||
@@ -3,23 +3,26 @@ package node
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
||||
tmtime "github.com/tendermint/tendermint/types/time"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
)
|
||||
|
||||
func TestNodeStartStop(t *testing.T) {
|
||||
@@ -27,17 +30,16 @@ func TestNodeStartStop(t *testing.T) {
|
||||
|
||||
// create & start node
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
assert.NoError(t, err, "expected no err on DefaultNewNode")
|
||||
err1 := n.Start()
|
||||
if err1 != nil {
|
||||
t.Error(err1)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Logf("Started node %v", n.sw.NodeInfo())
|
||||
|
||||
// wait for the node to produce a block
|
||||
blockCh := make(chan interface{})
|
||||
err = n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock, blockCh)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
select {
|
||||
case <-blockCh:
|
||||
case <-time.After(10 * time.Second):
|
||||
@@ -89,7 +91,7 @@ func TestNodeDelayedStop(t *testing.T) {
|
||||
// create & start node
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
n.GenesisDoc().GenesisTime = now.Add(5 * time.Second)
|
||||
assert.NoError(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
n.Start()
|
||||
startTime := tmtime.Now()
|
||||
@@ -101,7 +103,7 @@ func TestNodeSetAppVersion(t *testing.T) {
|
||||
|
||||
// create & start node
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
assert.NoError(t, err, "expected no err on DefaultNewNode")
|
||||
require.NoError(t, err)
|
||||
|
||||
// default config uses the kvstore app
|
||||
var appVersion version.Protocol = kvstore.ProtocolVersion
|
||||
@@ -113,3 +115,80 @@ func TestNodeSetAppVersion(t *testing.T) {
|
||||
// check version is set in node info
|
||||
assert.Equal(t, n.nodeInfo.(p2p.DefaultNodeInfo).ProtocolVersion.App, appVersion)
|
||||
}
|
||||
|
||||
func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
addr := "tcp://" + testFreeAddr(t)
|
||||
|
||||
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
|
||||
config.BaseConfig.PrivValidatorListenAddr = addr
|
||||
|
||||
rs := privval.NewRemoteSigner(
|
||||
log.TestingLogger(),
|
||||
config.ChainID(),
|
||||
addr,
|
||||
types.NewMockPV(),
|
||||
ed25519.GenPrivKey(),
|
||||
)
|
||||
privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs)
|
||||
go func() {
|
||||
err := rs.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
defer rs.Stop()
|
||||
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
assert.IsType(t, &privval.TCPVal{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
// address without a protocol must result in error
|
||||
func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
|
||||
addrNoPrefix := testFreeAddr(t)
|
||||
|
||||
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
|
||||
config.BaseConfig.PrivValidatorListenAddr = addrNoPrefix
|
||||
|
||||
_, err := DefaultNewNode(config, log.TestingLogger())
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
tmpfile := "/tmp/kms." + cmn.RandStr(6) + ".sock"
|
||||
defer os.Remove(tmpfile) // clean up
|
||||
|
||||
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
|
||||
config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile
|
||||
|
||||
rs := privval.NewIPCRemoteSigner(
|
||||
log.TestingLogger(),
|
||||
config.ChainID(),
|
||||
tmpfile,
|
||||
types.NewMockPV(),
|
||||
)
|
||||
privval.IPCRemoteSignerConnDeadline(3 * time.Second)(rs)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
assert.IsType(t, &privval.IPCVal{}, n.PrivValidator())
|
||||
}()
|
||||
|
||||
err := rs.Start()
|
||||
require.NoError(t, err)
|
||||
defer rs.Stop()
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
// testFreeAddr claims a free port so we don't block on listener being ready.
|
||||
func testFreeAddr(t *testing.T) string {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
defer ln.Close()
|
||||
|
||||
return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port)
|
||||
}
|
||||
|
||||
@@ -230,3 +230,31 @@ func (info DefaultNodeInfo) NetAddress() *NetAddress {
|
||||
}
|
||||
return netAddr
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------
|
||||
// These methods are for Protobuf Compatibility
|
||||
|
||||
// Size returns the size of the amino encoding, in bytes.
|
||||
func (info *DefaultNodeInfo) Size() int {
|
||||
bs, _ := info.Marshal()
|
||||
return len(bs)
|
||||
}
|
||||
|
||||
// Marshal returns the amino encoding.
|
||||
func (info *DefaultNodeInfo) Marshal() ([]byte, error) {
|
||||
return cdc.MarshalBinaryBare(info)
|
||||
}
|
||||
|
||||
// MarshalTo calls Marshal and copies to the given buffer.
|
||||
func (info *DefaultNodeInfo) MarshalTo(data []byte) (int, error) {
|
||||
bs, err := info.Marshal()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return copy(data, bs), nil
|
||||
}
|
||||
|
||||
// Unmarshal deserializes from amino encoded form.
|
||||
func (info *DefaultNodeInfo) Unmarshal(bs []byte) error {
|
||||
return cdc.UnmarshalBinaryBare(bs, info)
|
||||
}
|
||||
|
||||
@@ -69,6 +69,7 @@ func (rs *IPCRemoteSigner) OnStart() error {
|
||||
for {
|
||||
conn, err := rs.listener.AcceptUnix()
|
||||
if err != nil {
|
||||
rs.Logger.Error("AcceptUnix", "err", err)
|
||||
return
|
||||
}
|
||||
go rs.handleConnection(conn)
|
||||
|
||||
@@ -370,20 +370,27 @@ func TestTxSearch(t *testing.T) {
|
||||
}
|
||||
|
||||
// query by height
|
||||
result, err = c.TxSearch(fmt.Sprintf("tx.height >= %d", txHeight), true, 1, 30)
|
||||
result, err = c.TxSearch(fmt.Sprintf("tx.height=%d", txHeight), true, 1, 30)
|
||||
require.Nil(t, err, "%+v", err)
|
||||
require.Len(t, result.Txs, 1)
|
||||
|
||||
// we query for non existing tx
|
||||
// query for non existing tx
|
||||
result, err = c.TxSearch(fmt.Sprintf("tx.hash='%X'", anotherTxHash), false, 1, 30)
|
||||
require.Nil(t, err, "%+v", err)
|
||||
require.Len(t, result.Txs, 0)
|
||||
|
||||
// we query using a tag (see kvstore application)
|
||||
// query using a tag (see kvstore application)
|
||||
result, err = c.TxSearch("app.creator='Cosmoshi Netowoko'", false, 1, 30)
|
||||
require.Nil(t, err, "%+v", err)
|
||||
if len(result.Txs) == 0 {
|
||||
t.Fatal("expected a lot of transactions")
|
||||
}
|
||||
|
||||
// query using a tag (see kvstore application) and height
|
||||
result, err = c.TxSearch("app.creator='Cosmoshi Netowoko' AND tx.height<10000", true, 1, 30)
|
||||
require.Nil(t, err, "%+v", err)
|
||||
if len(result.Txs) == 0 {
|
||||
t.Fatal("expected a lot of transactions")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -104,7 +105,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
|
||||
go func() {
|
||||
for event := range ch {
|
||||
tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)}
|
||||
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), wsCtx.Request.ID+"#event", tmResult))
|
||||
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), tmResult))
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@ func NewJSONRPCClient(remote string) *JSONRPCClient {
|
||||
}
|
||||
|
||||
func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
|
||||
request, err := types.MapToRequest(c.cdc, "jsonrpc-client", method, params)
|
||||
request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("jsonrpc-client"), method, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -214,7 +214,7 @@ func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
|
||||
|
||||
// Call the given method. See Send description.
|
||||
func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
|
||||
request, err := types.MapToRequest(c.cdc, "ws-client", method, params)
|
||||
request, err := types.MapToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -224,7 +224,7 @@ func (c *WSClient) Call(ctx context.Context, method string, params map[string]in
|
||||
// CallWithArrayParams the given method with params in a form of array. See
|
||||
// Send description.
|
||||
func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
|
||||
request, err := types.ArrayToRequest(c.cdc, "ws-client", method, params)
|
||||
request, err := types.ArrayToRequest(c.cdc, types.JSONRPCStringID("ws-client"), method, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
WriteRPCResponseHTTP(w, types.RPCInvalidRequestError("", errors.Wrap(err, "Error reading request body")))
|
||||
WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(types.JSONRPCStringID(""), errors.Wrap(err, "Error reading request body")))
|
||||
return
|
||||
}
|
||||
// if its an empty request (like from a browser),
|
||||
@@ -116,12 +116,12 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo
|
||||
var request types.RPCRequest
|
||||
err = json.Unmarshal(b, &request)
|
||||
if err != nil {
|
||||
WriteRPCResponseHTTP(w, types.RPCParseError("", errors.Wrap(err, "Error unmarshalling request")))
|
||||
WriteRPCResponseHTTP(w, types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "Error unmarshalling request")))
|
||||
return
|
||||
}
|
||||
// A Notification is a Request object without an "id" member.
|
||||
// The Server MUST NOT reply to a Notification, including those that are within a batch request.
|
||||
if request.ID == "" {
|
||||
if request.ID == types.JSONRPCStringID("") {
|
||||
logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)")
|
||||
return
|
||||
}
|
||||
@@ -255,7 +255,7 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func
|
||||
// Exception for websocket endpoints
|
||||
if rpcFunc.ws {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(""))
|
||||
WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(types.JSONRPCStringID("")))
|
||||
}
|
||||
}
|
||||
// All other endpoints
|
||||
@@ -263,17 +263,17 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func
|
||||
logger.Debug("HTTP HANDLER", "req", r)
|
||||
args, err := httpParamsToArgs(rpcFunc, cdc, r)
|
||||
if err != nil {
|
||||
WriteRPCResponseHTTP(w, types.RPCInvalidParamsError("", errors.Wrap(err, "Error converting http params to arguments")))
|
||||
WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "Error converting http params to arguments")))
|
||||
return
|
||||
}
|
||||
returns := rpcFunc.f.Call(args)
|
||||
logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns)
|
||||
result, err := unreflectResult(returns)
|
||||
if err != nil {
|
||||
WriteRPCResponseHTTP(w, types.RPCInternalError("", err))
|
||||
WriteRPCResponseHTTP(w, types.RPCInternalError(types.JSONRPCStringID(""), err))
|
||||
return
|
||||
}
|
||||
WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, "", result))
|
||||
WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(cdc, types.JSONRPCStringID(""), result))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -580,7 +580,7 @@ func (wsc *wsConnection) readRoutine() {
|
||||
err = fmt.Errorf("WSJSONRPC: %v", r)
|
||||
}
|
||||
wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack()))
|
||||
wsc.WriteRPCResponse(types.RPCInternalError("unknown", err))
|
||||
wsc.WriteRPCResponse(types.RPCInternalError(types.JSONRPCStringID("unknown"), err))
|
||||
go wsc.readRoutine()
|
||||
} else {
|
||||
wsc.baseConn.Close() // nolint: errcheck
|
||||
@@ -615,13 +615,13 @@ func (wsc *wsConnection) readRoutine() {
|
||||
var request types.RPCRequest
|
||||
err = json.Unmarshal(in, &request)
|
||||
if err != nil {
|
||||
wsc.WriteRPCResponse(types.RPCParseError("", errors.Wrap(err, "Error unmarshaling request")))
|
||||
wsc.WriteRPCResponse(types.RPCParseError(types.JSONRPCStringID(""), errors.Wrap(err, "Error unmarshaling request")))
|
||||
continue
|
||||
}
|
||||
|
||||
// A Notification is a Request object without an "id" member.
|
||||
// The Server MUST NOT reply to a Notification, including those that are within a batch request.
|
||||
if request.ID == "" {
|
||||
if request.ID == types.JSONRPCStringID("") {
|
||||
wsc.Logger.Debug("WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)")
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -47,21 +47,22 @@ func statusOK(code int) bool { return code >= 200 && code <= 299 }
|
||||
func TestRPCParams(t *testing.T) {
|
||||
mux := testMux()
|
||||
tests := []struct {
|
||||
payload string
|
||||
wantErr string
|
||||
payload string
|
||||
wantErr string
|
||||
expectedId interface{}
|
||||
}{
|
||||
// bad
|
||||
{`{"jsonrpc": "2.0", "id": "0"}`, "Method not found"},
|
||||
{`{"jsonrpc": "2.0", "method": "y", "id": "0"}`, "Method not found"},
|
||||
{`{"method": "c", "id": "0", "params": a}`, "invalid character"},
|
||||
{`{"method": "c", "id": "0", "params": ["a"]}`, "got 1"},
|
||||
{`{"method": "c", "id": "0", "params": ["a", "b"]}`, "invalid character"},
|
||||
{`{"method": "c", "id": "0", "params": [1, 1]}`, "of type string"},
|
||||
{`{"jsonrpc": "2.0", "id": "0"}`, "Method not found", types.JSONRPCStringID("0")},
|
||||
{`{"jsonrpc": "2.0", "method": "y", "id": "0"}`, "Method not found", types.JSONRPCStringID("0")},
|
||||
{`{"method": "c", "id": "0", "params": a}`, "invalid character", types.JSONRPCStringID("")}, // id not captured in JSON parsing failures
|
||||
{`{"method": "c", "id": "0", "params": ["a"]}`, "got 1", types.JSONRPCStringID("0")},
|
||||
{`{"method": "c", "id": "0", "params": ["a", "b"]}`, "invalid character", types.JSONRPCStringID("0")},
|
||||
{`{"method": "c", "id": "0", "params": [1, 1]}`, "of type string", types.JSONRPCStringID("0")},
|
||||
|
||||
// good
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`, ""},
|
||||
{`{"method": "c", "id": "0", "params": {}}`, ""},
|
||||
{`{"method": "c", "id": "0", "params": ["a", "10"]}`, ""},
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`, "", types.JSONRPCStringID("0")},
|
||||
{`{"method": "c", "id": "0", "params": {}}`, "", types.JSONRPCStringID("0")},
|
||||
{`{"method": "c", "id": "0", "params": ["a", "10"]}`, "", types.JSONRPCStringID("0")},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
@@ -80,7 +81,7 @@ func TestRPCParams(t *testing.T) {
|
||||
recv := new(types.RPCResponse)
|
||||
assert.Nil(t, json.Unmarshal(blob, recv), "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob)
|
||||
assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i)
|
||||
|
||||
assert.Equal(t, tt.expectedId, recv.ID, "#%d: expected ID not matched in RPCResponse", i)
|
||||
if tt.wantErr == "" {
|
||||
assert.Nil(t, recv.Error, "#%d: not expecting an error", i)
|
||||
} else {
|
||||
@@ -91,9 +92,56 @@ func TestRPCParams(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJSONRPCID(t *testing.T) {
|
||||
mux := testMux()
|
||||
tests := []struct {
|
||||
payload string
|
||||
wantErr bool
|
||||
expectedId interface{}
|
||||
}{
|
||||
// good id
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": ["a", "10"]}`, false, types.JSONRPCStringID("0")},
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": "abc", "params": ["a", "10"]}`, false, types.JSONRPCStringID("abc")},
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": 0, "params": ["a", "10"]}`, false, types.JSONRPCIntID(0)},
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": 1, "params": ["a", "10"]}`, false, types.JSONRPCIntID(1)},
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": 1.3, "params": ["a", "10"]}`, false, types.JSONRPCIntID(1)},
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": -1, "params": ["a", "10"]}`, false, types.JSONRPCIntID(-1)},
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": null, "params": ["a", "10"]}`, false, nil},
|
||||
|
||||
// bad id
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": {}, "params": ["a", "10"]}`, true, nil},
|
||||
{`{"jsonrpc": "2.0", "method": "c", "id": [], "params": ["a", "10"]}`, true, nil},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload))
|
||||
rec := httptest.NewRecorder()
|
||||
mux.ServeHTTP(rec, req)
|
||||
res := rec.Result()
|
||||
// Always expecting back a JSONRPCResponse
|
||||
assert.True(t, statusOK(res.StatusCode), "#%d: should always return 2XX", i)
|
||||
blob, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: err reading body: %v", i, err)
|
||||
continue
|
||||
}
|
||||
|
||||
recv := new(types.RPCResponse)
|
||||
err = json.Unmarshal(blob, recv)
|
||||
assert.Nil(t, err, "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob)
|
||||
if !tt.wantErr {
|
||||
assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i)
|
||||
assert.Equal(t, tt.expectedId, recv.ID, "#%d: expected ID not matched in RPCResponse", i)
|
||||
assert.Nil(t, recv.Error, "#%d: not expecting an error", i)
|
||||
} else {
|
||||
assert.True(t, recv.Error.Code < 0, "#%d: not expecting a positive JSONRPC code", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRPCNotification(t *testing.T) {
|
||||
mux := testMux()
|
||||
body := strings.NewReader(`{"jsonrpc": "2.0"}`)
|
||||
body := strings.NewReader(`{"jsonrpc": "2.0", "id": ""}`)
|
||||
req, _ := http.NewRequest("POST", "http://localhost/", body)
|
||||
rec := httptest.NewRecorder()
|
||||
mux.ServeHTTP(rec, req)
|
||||
@@ -134,7 +182,7 @@ func TestWebsocketManagerHandler(t *testing.T) {
|
||||
}
|
||||
|
||||
// check basic functionality works
|
||||
req, err := types.MapToRequest(amino.NewCodec(), "TestWebsocketManager", "c", map[string]interface{}{"s": "a", "i": 10})
|
||||
req, err := types.MapToRequest(amino.NewCodec(), types.JSONRPCStringID("TestWebsocketManager"), "c", map[string]interface{}{"s": "a", "i": 10})
|
||||
require.NoError(t, err)
|
||||
err = c.WriteJSON(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -132,7 +132,7 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler
|
||||
"Panic in RPC HTTP handler", "err", e, "stack",
|
||||
string(debug.Stack()),
|
||||
)
|
||||
WriteRPCResponseHTTPError(rww, http.StatusInternalServerError, types.RPCInternalError("", e.(error)))
|
||||
WriteRPCResponseHTTPError(rww, http.StatusInternalServerError, types.RPCInternalError(types.JSONRPCStringID(""), e.(error)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -13,17 +14,75 @@ import (
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
)
|
||||
|
||||
// a wrapper to emulate a sum type: jsonrpcid = string | int
|
||||
// TODO: refactor when Go 2.0 arrives https://github.com/golang/go/issues/19412
|
||||
type jsonrpcid interface {
|
||||
isJSONRPCID()
|
||||
}
|
||||
|
||||
// JSONRPCStringID a wrapper for JSON-RPC string IDs
|
||||
type JSONRPCStringID string
|
||||
|
||||
func (JSONRPCStringID) isJSONRPCID() {}
|
||||
|
||||
// JSONRPCIntID a wrapper for JSON-RPC integer IDs
|
||||
type JSONRPCIntID int
|
||||
|
||||
func (JSONRPCIntID) isJSONRPCID() {}
|
||||
|
||||
func idFromInterface(idInterface interface{}) (jsonrpcid, error) {
|
||||
switch id := idInterface.(type) {
|
||||
case string:
|
||||
return JSONRPCStringID(id), nil
|
||||
case float64:
|
||||
// json.Unmarshal uses float64 for all numbers
|
||||
// (https://golang.org/pkg/encoding/json/#Unmarshal),
|
||||
// but the JSONRPC2.0 spec says the id SHOULD NOT contain
|
||||
// decimals - so we truncate the decimals here.
|
||||
return JSONRPCIntID(int(id)), nil
|
||||
default:
|
||||
typ := reflect.TypeOf(id)
|
||||
return nil, fmt.Errorf("JSON-RPC ID (%v) is of unknown type (%v)", id, typ)
|
||||
}
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
// REQUEST
|
||||
|
||||
type RPCRequest struct {
|
||||
JSONRPC string `json:"jsonrpc"`
|
||||
ID string `json:"id"`
|
||||
ID jsonrpcid `json:"id"`
|
||||
Method string `json:"method"`
|
||||
Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{}
|
||||
}
|
||||
|
||||
func NewRPCRequest(id string, method string, params json.RawMessage) RPCRequest {
|
||||
// UnmarshalJSON custom JSON unmarshalling due to jsonrpcid being string or int
|
||||
func (request *RPCRequest) UnmarshalJSON(data []byte) error {
|
||||
unsafeReq := &struct {
|
||||
JSONRPC string `json:"jsonrpc"`
|
||||
ID interface{} `json:"id"`
|
||||
Method string `json:"method"`
|
||||
Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{}
|
||||
}{}
|
||||
err := json.Unmarshal(data, &unsafeReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
request.JSONRPC = unsafeReq.JSONRPC
|
||||
request.Method = unsafeReq.Method
|
||||
request.Params = unsafeReq.Params
|
||||
if unsafeReq.ID == nil {
|
||||
return nil
|
||||
}
|
||||
id, err := idFromInterface(unsafeReq.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
request.ID = id
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewRPCRequest(id jsonrpcid, method string, params json.RawMessage) RPCRequest {
|
||||
return RPCRequest{
|
||||
JSONRPC: "2.0",
|
||||
ID: id,
|
||||
@@ -36,7 +95,7 @@ func (req RPCRequest) String() string {
|
||||
return fmt.Sprintf("[%s %s]", req.ID, req.Method)
|
||||
}
|
||||
|
||||
func MapToRequest(cdc *amino.Codec, id string, method string, params map[string]interface{}) (RPCRequest, error) {
|
||||
func MapToRequest(cdc *amino.Codec, id jsonrpcid, method string, params map[string]interface{}) (RPCRequest, error) {
|
||||
var params_ = make(map[string]json.RawMessage, len(params))
|
||||
for name, value := range params {
|
||||
valueJSON, err := cdc.MarshalJSON(value)
|
||||
@@ -53,7 +112,7 @@ func MapToRequest(cdc *amino.Codec, id string, method string, params map[string]
|
||||
return request, nil
|
||||
}
|
||||
|
||||
func ArrayToRequest(cdc *amino.Codec, id string, method string, params []interface{}) (RPCRequest, error) {
|
||||
func ArrayToRequest(cdc *amino.Codec, id jsonrpcid, method string, params []interface{}) (RPCRequest, error) {
|
||||
var params_ = make([]json.RawMessage, len(params))
|
||||
for i, value := range params {
|
||||
valueJSON, err := cdc.MarshalJSON(value)
|
||||
@@ -89,12 +148,38 @@ func (err RPCError) Error() string {
|
||||
|
||||
type RPCResponse struct {
|
||||
JSONRPC string `json:"jsonrpc"`
|
||||
ID string `json:"id"`
|
||||
ID jsonrpcid `json:"id"`
|
||||
Result json.RawMessage `json:"result,omitempty"`
|
||||
Error *RPCError `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func NewRPCSuccessResponse(cdc *amino.Codec, id string, res interface{}) RPCResponse {
|
||||
// UnmarshalJSON custom JSON unmarshalling due to jsonrpcid being string or int
|
||||
func (response *RPCResponse) UnmarshalJSON(data []byte) error {
|
||||
unsafeResp := &struct {
|
||||
JSONRPC string `json:"jsonrpc"`
|
||||
ID interface{} `json:"id"`
|
||||
Result json.RawMessage `json:"result,omitempty"`
|
||||
Error *RPCError `json:"error,omitempty"`
|
||||
}{}
|
||||
err := json.Unmarshal(data, &unsafeResp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
response.JSONRPC = unsafeResp.JSONRPC
|
||||
response.Error = unsafeResp.Error
|
||||
response.Result = unsafeResp.Result
|
||||
if unsafeResp.ID == nil {
|
||||
return nil
|
||||
}
|
||||
id, err := idFromInterface(unsafeResp.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
response.ID = id
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewRPCSuccessResponse(cdc *amino.Codec, id jsonrpcid, res interface{}) RPCResponse {
|
||||
var rawMsg json.RawMessage
|
||||
|
||||
if res != nil {
|
||||
@@ -109,7 +194,7 @@ func NewRPCSuccessResponse(cdc *amino.Codec, id string, res interface{}) RPCResp
|
||||
return RPCResponse{JSONRPC: "2.0", ID: id, Result: rawMsg}
|
||||
}
|
||||
|
||||
func NewRPCErrorResponse(id string, code int, msg string, data string) RPCResponse {
|
||||
func NewRPCErrorResponse(id jsonrpcid, code int, msg string, data string) RPCResponse {
|
||||
return RPCResponse{
|
||||
JSONRPC: "2.0",
|
||||
ID: id,
|
||||
@@ -124,27 +209,27 @@ func (resp RPCResponse) String() string {
|
||||
return fmt.Sprintf("[%s %s]", resp.ID, resp.Error)
|
||||
}
|
||||
|
||||
func RPCParseError(id string, err error) RPCResponse {
|
||||
func RPCParseError(id jsonrpcid, err error) RPCResponse {
|
||||
return NewRPCErrorResponse(id, -32700, "Parse error. Invalid JSON", err.Error())
|
||||
}
|
||||
|
||||
func RPCInvalidRequestError(id string, err error) RPCResponse {
|
||||
func RPCInvalidRequestError(id jsonrpcid, err error) RPCResponse {
|
||||
return NewRPCErrorResponse(id, -32600, "Invalid Request", err.Error())
|
||||
}
|
||||
|
||||
func RPCMethodNotFoundError(id string) RPCResponse {
|
||||
func RPCMethodNotFoundError(id jsonrpcid) RPCResponse {
|
||||
return NewRPCErrorResponse(id, -32601, "Method not found", "")
|
||||
}
|
||||
|
||||
func RPCInvalidParamsError(id string, err error) RPCResponse {
|
||||
func RPCInvalidParamsError(id jsonrpcid, err error) RPCResponse {
|
||||
return NewRPCErrorResponse(id, -32602, "Invalid params", err.Error())
|
||||
}
|
||||
|
||||
func RPCInternalError(id string, err error) RPCResponse {
|
||||
func RPCInternalError(id jsonrpcid, err error) RPCResponse {
|
||||
return NewRPCErrorResponse(id, -32603, "Internal error", err.Error())
|
||||
}
|
||||
|
||||
func RPCServerError(id string, err error) RPCResponse {
|
||||
func RPCServerError(id jsonrpcid, err error) RPCResponse {
|
||||
return NewRPCErrorResponse(id, -32000, "Server error", err.Error())
|
||||
}
|
||||
|
||||
|
||||
@@ -15,24 +15,57 @@ type SampleResult struct {
|
||||
Value string
|
||||
}
|
||||
|
||||
type responseTest struct {
|
||||
id jsonrpcid
|
||||
expected string
|
||||
}
|
||||
|
||||
var responseTests = []responseTest{
|
||||
{JSONRPCStringID("1"), `"1"`},
|
||||
{JSONRPCStringID("alphabet"), `"alphabet"`},
|
||||
{JSONRPCStringID(""), `""`},
|
||||
{JSONRPCStringID("àáâ"), `"àáâ"`},
|
||||
{JSONRPCIntID(-1), "-1"},
|
||||
{JSONRPCIntID(0), "0"},
|
||||
{JSONRPCIntID(1), "1"},
|
||||
{JSONRPCIntID(100), "100"},
|
||||
}
|
||||
|
||||
func TestResponses(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cdc := amino.NewCodec()
|
||||
for _, tt := range responseTests {
|
||||
jsonid := tt.id
|
||||
a := NewRPCSuccessResponse(cdc, jsonid, &SampleResult{"hello"})
|
||||
b, _ := json.Marshal(a)
|
||||
s := fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"result":{"Value":"hello"}}`, tt.expected)
|
||||
assert.Equal(string(s), string(b))
|
||||
|
||||
a := NewRPCSuccessResponse(cdc, "1", &SampleResult{"hello"})
|
||||
b, _ := json.Marshal(a)
|
||||
s := `{"jsonrpc":"2.0","id":"1","result":{"Value":"hello"}}`
|
||||
assert.Equal(string(s), string(b))
|
||||
d := RPCParseError(jsonid, errors.New("Hello world"))
|
||||
e, _ := json.Marshal(d)
|
||||
f := fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}`, tt.expected)
|
||||
assert.Equal(string(f), string(e))
|
||||
|
||||
d := RPCParseError("1", errors.New("Hello world"))
|
||||
e, _ := json.Marshal(d)
|
||||
f := `{"jsonrpc":"2.0","id":"1","error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}`
|
||||
assert.Equal(string(f), string(e))
|
||||
g := RPCMethodNotFoundError(jsonid)
|
||||
h, _ := json.Marshal(g)
|
||||
i := fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"error":{"code":-32601,"message":"Method not found"}}`, tt.expected)
|
||||
assert.Equal(string(h), string(i))
|
||||
}
|
||||
}
|
||||
|
||||
g := RPCMethodNotFoundError("2")
|
||||
h, _ := json.Marshal(g)
|
||||
i := `{"jsonrpc":"2.0","id":"2","error":{"code":-32601,"message":"Method not found"}}`
|
||||
assert.Equal(string(h), string(i))
|
||||
func TestUnmarshallResponses(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cdc := amino.NewCodec()
|
||||
for _, tt := range responseTests {
|
||||
response := &RPCResponse{}
|
||||
err := json.Unmarshal([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":%v,"result":{"Value":"hello"}}`, tt.expected)), response)
|
||||
assert.Nil(err)
|
||||
a := NewRPCSuccessResponse(cdc, tt.id, &SampleResult{"hello"})
|
||||
assert.Equal(*response, a)
|
||||
}
|
||||
response := &RPCResponse{}
|
||||
err := json.Unmarshal([]byte(`{"jsonrpc":"2.0","id":true,"result":{"Value":"hello"}}`), response)
|
||||
assert.NotNil(err)
|
||||
}
|
||||
|
||||
func TestRPCError(t *testing.T) {
|
||||
|
||||
@@ -226,8 +226,9 @@ func execBlockOnProxyApp(
|
||||
|
||||
commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB)
|
||||
|
||||
// Begin block.
|
||||
_, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
|
||||
// Begin block
|
||||
var err error
|
||||
abciResponses.BeginBlock, err = proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
|
||||
Hash: block.Hash(),
|
||||
Header: types.TM2PB.Header(&block.Header),
|
||||
LastCommitInfo: commitInfo,
|
||||
@@ -417,8 +418,16 @@ func updateState(
|
||||
// Fire TxEvent for every tx.
|
||||
// NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
|
||||
func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
|
||||
eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
|
||||
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
|
||||
eventBus.PublishEventNewBlock(types.EventDataNewBlock{
|
||||
Block: block,
|
||||
ResultBeginBlock: *abciResponses.BeginBlock,
|
||||
ResultEndBlock: *abciResponses.EndBlock,
|
||||
})
|
||||
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
|
||||
Header: block.Header,
|
||||
ResultBeginBlock: *abciResponses.BeginBlock,
|
||||
ResultEndBlock: *abciResponses.EndBlock,
|
||||
})
|
||||
|
||||
for i, tx := range block.Data.Txs {
|
||||
eventBus.PublishEventTx(types.EventDataTx{types.TxResult{
|
||||
|
||||
@@ -64,7 +64,8 @@ type State struct {
|
||||
// Validators are persisted to the database separately every time they change,
|
||||
// so we can query for historical validator sets.
|
||||
// Note that if s.LastBlockHeight causes a valset change,
|
||||
// we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1
|
||||
// we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1 + 1
|
||||
// Extra +1 due to nextValSet delay.
|
||||
NextValidators *types.ValidatorSet
|
||||
Validators *types.ValidatorSet
|
||||
LastValidators *types.ValidatorSet
|
||||
|
||||
@@ -3,9 +3,10 @@ package state
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"testing"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
@@ -260,6 +261,27 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreLoadValidatorsIncrementsAccum(t *testing.T) {
|
||||
const valSetSize = 2
|
||||
tearDown, stateDB, state := setupTestCase(t)
|
||||
state.Validators = genValSet(valSetSize)
|
||||
state.NextValidators = state.Validators.CopyIncrementAccum(1)
|
||||
SaveState(stateDB, state)
|
||||
defer tearDown(t)
|
||||
|
||||
nextHeight := state.LastBlockHeight + 1
|
||||
|
||||
v0, err := LoadValidators(stateDB, nextHeight)
|
||||
assert.Nil(t, err)
|
||||
acc0 := v0.Validators[0].Accum
|
||||
|
||||
v1, err := LoadValidators(stateDB, nextHeight+1)
|
||||
assert.Nil(t, err)
|
||||
acc1 := v1.Validators[0].Accum
|
||||
|
||||
assert.NotEqual(t, acc1, acc0, "expected Accum value to change between heights")
|
||||
}
|
||||
|
||||
// TestValidatorChangesSaveLoad tests saving and loading a validator set with
|
||||
// changes.
|
||||
func TestManyValidatorChangesSaveLoad(t *testing.T) {
|
||||
|
||||
@@ -89,7 +89,9 @@ func saveState(db dbm.DB, state State, key []byte) {
|
||||
nextHeight := state.LastBlockHeight + 1
|
||||
// If first block, save validators for block 1.
|
||||
if nextHeight == 1 {
|
||||
lastHeightVoteChanged := int64(1) // Due to Tendermint validator set changes being delayed 1 block.
|
||||
// This extra logic due to Tendermint validator set changes being delayed 1 block.
|
||||
// It may get overwritten due to InitChain validator updates.
|
||||
lastHeightVoteChanged := int64(1)
|
||||
saveValidatorsInfo(db, nextHeight, lastHeightVoteChanged, state.Validators)
|
||||
}
|
||||
// Save next validators.
|
||||
@@ -105,8 +107,9 @@ func saveState(db dbm.DB, state State, key []byte) {
|
||||
// of the various ABCI calls during block processing.
|
||||
// It is persisted to disk for each height before calling Commit.
|
||||
type ABCIResponses struct {
|
||||
DeliverTx []*abci.ResponseDeliverTx
|
||||
EndBlock *abci.ResponseEndBlock
|
||||
DeliverTx []*abci.ResponseDeliverTx
|
||||
EndBlock *abci.ResponseEndBlock
|
||||
BeginBlock *abci.ResponseBeginBlock
|
||||
}
|
||||
|
||||
// NewABCIResponses returns a new ABCIResponses
|
||||
@@ -191,12 +194,14 @@ func LoadValidators(db dbm.DB, height int64) (*types.ValidatorSet, error) {
|
||||
),
|
||||
)
|
||||
}
|
||||
valInfo2.ValidatorSet.IncrementAccum(int(height - valInfo.LastHeightChanged)) // mutate
|
||||
valInfo = valInfo2
|
||||
}
|
||||
|
||||
return valInfo.ValidatorSet, nil
|
||||
}
|
||||
|
||||
// CONTRACT: Returned ValidatorsInfo can be mutated.
|
||||
func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo {
|
||||
buf := db.Get(calcValidatorsKey(height))
|
||||
if len(buf) == 0 {
|
||||
@@ -215,18 +220,22 @@ func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo {
|
||||
return v
|
||||
}
|
||||
|
||||
// saveValidatorsInfo persists the validator set for the next block to disk.
|
||||
// saveValidatorsInfo persists the validator set.
|
||||
// `height` is the effective height for which the validator is responsible for signing.
|
||||
// It should be called from s.Save(), right before the state itself is persisted.
|
||||
// If the validator set did not change after processing the latest block,
|
||||
// only the last height for which the validators changed is persisted.
|
||||
func saveValidatorsInfo(db dbm.DB, nextHeight, changeHeight int64, valSet *types.ValidatorSet) {
|
||||
valInfo := &ValidatorsInfo{
|
||||
LastHeightChanged: changeHeight,
|
||||
func saveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *types.ValidatorSet) {
|
||||
if lastHeightChanged > height {
|
||||
panic("LastHeightChanged cannot be greater than ValidatorsInfo height")
|
||||
}
|
||||
if changeHeight == nextHeight {
|
||||
valInfo := &ValidatorsInfo{
|
||||
LastHeightChanged: lastHeightChanged,
|
||||
}
|
||||
if lastHeightChanged == height {
|
||||
valInfo.ValidatorSet = valSet
|
||||
}
|
||||
db.Set(calcValidatorsKey(nextHeight), valInfo.Bytes())
|
||||
db.Set(calcValidatorsKey(height), valInfo.Bytes())
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
@@ -207,8 +207,11 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
|
||||
i++
|
||||
}
|
||||
|
||||
// sort by height by default
|
||||
// sort by height & index by default
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
if results[i].Height == results[j].Height {
|
||||
return results[i].Index < results[j].Index
|
||||
}
|
||||
return results[i].Height < results[j].Height
|
||||
})
|
||||
|
||||
@@ -225,9 +228,10 @@ func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool)
|
||||
return
|
||||
}
|
||||
|
||||
// lookForHeight returns a height if there is an "height=X" condition.
|
||||
func lookForHeight(conditions []query.Condition) (height int64) {
|
||||
for _, c := range conditions {
|
||||
if c.Tag == types.TxHeightKey {
|
||||
if c.Tag == types.TxHeightKey && c.Op == query.OpEqual {
|
||||
return c.Operand.(int64)
|
||||
}
|
||||
}
|
||||
@@ -408,9 +412,9 @@ LOOP:
|
||||
func startKey(c query.Condition, height int64) []byte {
|
||||
var key string
|
||||
if height > 0 {
|
||||
key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height)
|
||||
key = fmt.Sprintf("%s/%v/%d/", c.Tag, c.Operand, height)
|
||||
} else {
|
||||
key = fmt.Sprintf("%s/%v", c.Tag, c.Operand)
|
||||
key = fmt.Sprintf("%s/%v/", c.Tag, c.Operand)
|
||||
}
|
||||
return []byte(key)
|
||||
}
|
||||
|
||||
@@ -73,6 +73,8 @@ func TestTxSearch(t *testing.T) {
|
||||
{"account.number = 1 AND account.owner = 'Ivan'", 1},
|
||||
// search by exact match (two tags)
|
||||
{"account.number = 1 AND account.owner = 'Vlad'", 0},
|
||||
// search using a prefix of the stored value
|
||||
{"account.owner = 'Iv'", 0},
|
||||
// search by range
|
||||
{"account.number >= 1 AND account.number <= 5", 1},
|
||||
// search by range (lower bound)
|
||||
@@ -133,6 +135,7 @@ func TestTxSearchMultipleTxs(t *testing.T) {
|
||||
})
|
||||
txResult.Tx = types.Tx("Bob's account")
|
||||
txResult.Height = 2
|
||||
txResult.Index = 1
|
||||
err := indexer.Index(txResult)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -142,14 +145,26 @@ func TestTxSearchMultipleTxs(t *testing.T) {
|
||||
})
|
||||
txResult2.Tx = types.Tx("Alice's account")
|
||||
txResult2.Height = 1
|
||||
txResult2.Index = 2
|
||||
|
||||
err = indexer.Index(txResult2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// indexed third (to test the order of transactions)
|
||||
txResult3 := txResultWithTags([]cmn.KVPair{
|
||||
{Key: []byte("account.number"), Value: []byte("3")},
|
||||
})
|
||||
txResult3.Tx = types.Tx("Jack's account")
|
||||
txResult3.Height = 1
|
||||
txResult3.Index = 1
|
||||
err = indexer.Index(txResult3)
|
||||
require.NoError(t, err)
|
||||
|
||||
results, err := indexer.Search(query.MustParse("account.number >= 1"))
|
||||
assert.NoError(t, err)
|
||||
|
||||
require.Len(t, results, 2)
|
||||
assert.Equal(t, []*types.TxResult{txResult2, txResult}, results)
|
||||
require.Len(t, results, 3)
|
||||
assert.Equal(t, []*types.TxResult{txResult3, txResult2, txResult}, results)
|
||||
}
|
||||
|
||||
func TestIndexAllTags(t *testing.T) {
|
||||
|
||||
@@ -191,7 +191,7 @@ func (t *transacter) sendLoop(connIndex int) {
|
||||
c.SetWriteDeadline(now.Add(sendTimeout))
|
||||
err = c.WriteJSON(rpctypes.RPCRequest{
|
||||
JSONRPC: "2.0",
|
||||
ID: "tm-bench",
|
||||
ID: rpctypes.JSONRPCStringID("tm-bench"),
|
||||
Method: t.BroadcastTxMethod,
|
||||
Params: rawParamsJSON,
|
||||
})
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestNodeNewBlockReceived(t *testing.T) {
|
||||
n.SendBlocksTo(blockCh)
|
||||
|
||||
blockHeader := tmtypes.Header{Height: 5}
|
||||
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{blockHeader})
|
||||
emMock.Call("eventCallback", &em.EventMetric{}, tmtypes.EventDataNewBlockHeader{Header: blockHeader})
|
||||
|
||||
assert.Equal(t, int64(5), n.Height)
|
||||
assert.Equal(t, blockHeader, <-blockCh)
|
||||
|
||||
@@ -275,6 +275,28 @@ func (b *Block) StringShort() string {
|
||||
return fmt.Sprintf("Block#%v", b.Hash())
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------
|
||||
// These methods are for Protobuf Compatibility
|
||||
|
||||
// Marshal returns the amino encoding.
|
||||
func (b *Block) Marshal() ([]byte, error) {
|
||||
return cdc.MarshalBinaryBare(b)
|
||||
}
|
||||
|
||||
// MarshalTo calls Marshal and copies to the given buffer.
|
||||
func (b *Block) MarshalTo(data []byte) (int, error) {
|
||||
bs, err := b.Marshal()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return copy(data, bs), nil
|
||||
}
|
||||
|
||||
// Unmarshal deserializes from amino encoded form.
|
||||
func (b *Block) Unmarshal(bs []byte) error {
|
||||
return cdc.UnmarshalBinaryBare(bs, b)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
// MaxDataBytes returns the maximum size of block's data.
|
||||
|
||||
@@ -13,3 +13,31 @@ func NewBlockMeta(block *Block, blockParts *PartSet) *BlockMeta {
|
||||
Header: block.Header,
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------
|
||||
// These methods are for Protobuf Compatibility
|
||||
|
||||
// Size returns the size of the amino encoding, in bytes.
|
||||
func (bm *BlockMeta) Size() int {
|
||||
bs, _ := bm.Marshal()
|
||||
return len(bs)
|
||||
}
|
||||
|
||||
// Marshal returns the amino encoding.
|
||||
func (bm *BlockMeta) Marshal() ([]byte, error) {
|
||||
return cdc.MarshalBinaryBare(bm)
|
||||
}
|
||||
|
||||
// MarshalTo calls Marshal and copies to the given buffer.
|
||||
func (bm *BlockMeta) MarshalTo(data []byte) (int, error) {
|
||||
bs, err := bm.Marshal()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return copy(data, bs), nil
|
||||
}
|
||||
|
||||
// Unmarshal deserializes from amino encoded form.
|
||||
func (bm *BlockMeta) Unmarshal(bs []byte) error {
|
||||
return cdc.UnmarshalBinaryBare(bs, bm)
|
||||
}
|
||||
|
||||
@@ -71,12 +71,48 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string {
|
||||
result := make(map[string]string)
|
||||
for _, tag := range tags {
|
||||
// basic validation
|
||||
if len(tag.Key) == 0 {
|
||||
logger.Debug("Got tag with an empty key (skipping)", "tag", tag)
|
||||
continue
|
||||
}
|
||||
result[string(tag.Key)] = string(tag.Value)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
|
||||
return b.Publish(EventNewBlock, data)
|
||||
// no explicit deadline for publishing events
|
||||
ctx := context.Background()
|
||||
|
||||
resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
|
||||
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort()))
|
||||
|
||||
// add predefined tags
|
||||
logIfTagExists(EventTypeKey, tags, b.Logger)
|
||||
tags[EventTypeKey] = EventNewBlock
|
||||
|
||||
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
|
||||
return b.Publish(EventNewBlockHeader, data)
|
||||
// no explicit deadline for publishing events
|
||||
ctx := context.Background()
|
||||
|
||||
resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
|
||||
// TODO: Create StringShort method for Header and use it in logger.
|
||||
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header))
|
||||
|
||||
// add predefined tags
|
||||
logIfTagExists(EventTypeKey, tags, b.Logger)
|
||||
tags[EventTypeKey] = EventNewBlockHeader
|
||||
|
||||
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *EventBus) PublishEventVote(data EventDataVote) error {
|
||||
@@ -94,17 +130,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
|
||||
// no explicit deadline for publishing events
|
||||
ctx := context.Background()
|
||||
|
||||
tags := make(map[string]string)
|
||||
|
||||
// validate and fill tags from tx result
|
||||
for _, tag := range data.Result.Tags {
|
||||
// basic validation
|
||||
if len(tag.Key) == 0 {
|
||||
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx)
|
||||
continue
|
||||
}
|
||||
tags[string(tag.Key)] = string(tag.Value)
|
||||
}
|
||||
tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx))
|
||||
|
||||
// add predefined tags
|
||||
logIfTagExists(EventTypeKey, tags, b.Logger)
|
||||
|
||||
@@ -58,6 +58,90 @@ func TestEventBusPublishEventTx(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBusPublishEventNewBlock(t *testing.T) {
|
||||
eventBus := NewEventBus()
|
||||
err := eventBus.Start()
|
||||
require.NoError(t, err)
|
||||
defer eventBus.Stop()
|
||||
|
||||
block := MakeBlock(0, []Tx{}, nil, []Evidence{})
|
||||
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
|
||||
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
|
||||
|
||||
txEventsCh := make(chan interface{})
|
||||
|
||||
// PublishEventNewBlock adds the tm.event tag, so the query below should work
|
||||
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
|
||||
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
|
||||
require.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for e := range txEventsCh {
|
||||
edt := e.(EventDataNewBlock)
|
||||
assert.Equal(t, block, edt.Block)
|
||||
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
|
||||
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
|
||||
close(done)
|
||||
}
|
||||
}()
|
||||
|
||||
err = eventBus.PublishEventNewBlock(EventDataNewBlock{
|
||||
Block: block,
|
||||
ResultBeginBlock: resultBeginBlock,
|
||||
ResultEndBlock: resultEndBlock,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("did not receive a block after 1 sec.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
|
||||
eventBus := NewEventBus()
|
||||
err := eventBus.Start()
|
||||
require.NoError(t, err)
|
||||
defer eventBus.Stop()
|
||||
|
||||
block := MakeBlock(0, []Tx{}, nil, []Evidence{})
|
||||
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
|
||||
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
|
||||
|
||||
txEventsCh := make(chan interface{})
|
||||
|
||||
// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
|
||||
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
|
||||
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
|
||||
require.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for e := range txEventsCh {
|
||||
edt := e.(EventDataNewBlockHeader)
|
||||
assert.Equal(t, block.Header, edt.Header)
|
||||
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
|
||||
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
|
||||
close(done)
|
||||
}
|
||||
}()
|
||||
|
||||
err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{
|
||||
Header: block.Header,
|
||||
ResultBeginBlock: resultBeginBlock,
|
||||
ResultEndBlock: resultEndBlock,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("did not receive a block header after 1 sec.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventBusPublish(t *testing.T) {
|
||||
eventBus := NewEventBus()
|
||||
err := eventBus.Start()
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
amino "github.com/tendermint/go-amino"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
)
|
||||
@@ -56,11 +57,17 @@ func RegisterEventDatas(cdc *amino.Codec) {
|
||||
|
||||
type EventDataNewBlock struct {
|
||||
Block *Block `json:"block"`
|
||||
|
||||
ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"`
|
||||
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
|
||||
}
|
||||
|
||||
// light weight event for benchmarking
|
||||
type EventDataNewBlockHeader struct {
|
||||
Header Header `json:"header"`
|
||||
|
||||
ResultBeginBlock abci.ResponseBeginBlock `json:"result_begin_block"`
|
||||
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
|
||||
}
|
||||
|
||||
// All txs fire EventDataTx
|
||||
@@ -100,7 +107,7 @@ type EventDataCompleteProposal struct {
|
||||
Round int `json:"round"`
|
||||
Step string `json:"step"`
|
||||
|
||||
BlockID BlockID `json:"block_id"`
|
||||
BlockID BlockID `json:"block_id"`
|
||||
}
|
||||
|
||||
type EventDataVote struct {
|
||||
|
||||
@@ -200,6 +200,9 @@ func (ps *PartSet) Total() int {
|
||||
}
|
||||
|
||||
func (ps *PartSet) AddPart(part *Part) (bool, error) {
|
||||
if ps == nil {
|
||||
return false, nil
|
||||
}
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
|
||||
@@ -62,7 +62,11 @@ func (vals *ValidatorSet) CopyIncrementAccum(times int) *ValidatorSet {
|
||||
|
||||
// IncrementAccum increments accum of each validator and updates the
|
||||
// proposer. Panics if validator set is empty.
|
||||
// `times` must be positive.
|
||||
func (vals *ValidatorSet) IncrementAccum(times int) {
|
||||
if times <= 0 {
|
||||
panic("Cannot call IncrementAccum with non-positive times")
|
||||
}
|
||||
|
||||
// Add VotingPower * times to each validator and order into heap.
|
||||
validatorsHeap := cmn.NewHeap()
|
||||
|
||||
@@ -86,6 +86,19 @@ func TestCopy(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test that IncrementAccum requires positive times.
|
||||
func TestIncrementAccumPositiveTimes(t *testing.T) {
|
||||
vset := NewValidatorSet([]*Validator{
|
||||
newValidator([]byte("foo"), 1000),
|
||||
newValidator([]byte("bar"), 300),
|
||||
newValidator([]byte("baz"), 330),
|
||||
})
|
||||
|
||||
assert.Panics(t, func() { vset.IncrementAccum(-1) })
|
||||
assert.Panics(t, func() { vset.IncrementAccum(0) })
|
||||
vset.IncrementAccum(1)
|
||||
}
|
||||
|
||||
func BenchmarkValidatorSetCopy(b *testing.B) {
|
||||
b.StopTimer()
|
||||
vset := NewValidatorSet([]*Validator{})
|
||||
@@ -239,7 +252,7 @@ func TestProposerSelection3(t *testing.T) {
|
||||
mod := (cmn.RandInt() % 5) + 1
|
||||
if cmn.RandInt()%mod > 0 {
|
||||
// sometimes its up to 5
|
||||
times = cmn.RandInt() % 5
|
||||
times = (cmn.RandInt() % 4) + 1
|
||||
}
|
||||
vset.IncrementAccum(times)
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ const (
|
||||
// TMCoreSemVer is the current version of Tendermint Core.
|
||||
// It's the Semantic Version of the software.
|
||||
// Must be a string because scripts like dist.sh read this file.
|
||||
TMCoreSemVer = "0.26.3"
|
||||
TMCoreSemVer = "0.26.4"
|
||||
|
||||
// ABCISemVer is the semantic version of the ABCI library
|
||||
ABCISemVer = "0.15.0"
|
||||
|
||||
Reference in New Issue
Block a user