Compare commits

..

1 Commits

Author SHA1 Message Date
Callum Waters
e6925d5dff create valset-change tool 2022-10-07 11:38:52 +02:00
78 changed files with 1492 additions and 1581 deletions

View File

@@ -90,10 +90,8 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [crypto] \#6120 Implement batch verification interface for ed25519 and sr25519. (@marbar3778 & @Yawning)
- [types] \#6120 use batch verification for verifying commits signatures. (@marbar3778 & @cmwaters & @Yawning)
- If the key type supports the batch verification API it will try to batch verify. If the verification fails we will single verify each signature.
- [state] \#9505 Added logic so when pruning, the evidence period is taken into consideration and only deletes unecessary data (@samricotta)
### BUG FIXES
- [consensus] \#9229 fix round number of `enterPropose` when handling `RoundStepNewRound` timeout. (@fatcat22)
- [docker] \#9073 enable cross platform build using docker buildx
- [blocksync] \#9518 handle the case when the sending queue is full: retry block request after a timeout

View File

@@ -12,7 +12,7 @@ and hence to Tendermint.
* We are committed to providing a friendly, safe and welcoming environment for
all, regardless of level of experience, gender, gender identity and
expression, sexual orientation, disability, personal appearance, body size,
race, ethnicity, age, religion, nationality, or other similar characteristics.
race, ethnicity, age, religion, nationality, or other similar characteristic.
* On Slack, please avoid using overtly sexual nicknames or other nicknames that
might detract from a friendly, safe and welcoming environment for all.

View File

@@ -12,7 +12,7 @@ landing changes in `main`.
All work on the code base should be motivated by a [Github
Issue](https://github.com/tendermint/tendermint/issues).
[Search](https://github.com/tendermint/tendermint/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22)
is a good place to start when looking for places to contribute. If you
is a good place start when looking for places to contribute. If you
would like to work on an issue which already exists, please indicate so
by leaving a comment.
@@ -213,7 +213,7 @@ Changes with multiple classifications should be doubly included (eg. a bug fix
that is also a breaking change should be recorded under both).
Breaking changes are further subdivided according to the APIs/users they impact.
Any change that affects multiple APIs/users should be recorded multiply - for
Any change that effects multiple APIs/users should be recorded multiply - for
instance, a change to the `Blockchain Protocol` that removes a field from the
header should also be recorded under `CLI/RPC/Config` since the field will be
removed from the header in RPC responses as well.
@@ -247,7 +247,7 @@ To begin contributing, create a development branch either on `github.com/tenderm
Make changes, and before submitting a pull request, update the `CHANGELOG_PENDING.md` to record your change. Also, run either `git rebase` or `git merge` on top of the latest `main`. (Since pull requests are squash-merged, either is fine!)
Update the `UPGRADING.md` if the change you've made is breaking and the
instructions should be in place for a user on how he/she can upgrade its
instructions should be in place for a user on how he/she can upgrade it's
software (ABCI application, Tendermint-based blockchain, light client, wallet).
Once you have submitted a pull request label the pull request with either `R:minor`, if the change should be included in the next minor release, or `R:major`, if the change is meant for a major release.

View File

@@ -4,8 +4,14 @@ OUTPUT?=$(BUILDDIR)/tendermint
BUILD_TAGS?=tendermint
COMMIT_HASH := $(shell git rev-parse --short HEAD)
LD_FLAGS = -X github.com/tendermint/tendermint/version.TMGitCommitHash=$(COMMIT_HASH)
# If building a release, please checkout the version tag to get the correct version setting
ifneq ($(shell git symbolic-ref -q --short HEAD),)
VERSION := unreleased-$(shell git symbolic-ref -q --short HEAD)-$(shell git rev-parse HEAD)
else
VERSION := $(shell git describe)
endif
LD_FLAGS = -X github.com/tendermint/tendermint/version.TMCoreSemVer=$(VERSION)
BUILD_FLAGS = -mod=readonly -ldflags "$(LD_FLAGS)"
HTTPS_GIT := https://github.com/tendermint/tendermint.git
CGO_ENABLED ?= 0

View File

@@ -79,7 +79,7 @@ func NewApplication() *Application {
func (app *Application) Info(req types.RequestInfo) (resInfo types.ResponseInfo) {
return types.ResponseInfo{
Data: fmt.Sprintf("{\"size\":%v}", app.state.Size),
Version: version.ABCISemVer,
Version: version.ABCIVersion,
AppVersion: ProtocolVersion,
LastBlockHeight: app.state.Height,
LastBlockAppHash: app.state.AppHash,

View File

@@ -6,4 +6,4 @@ import (
// TODO: eliminate this after some version refactor
const Version = version.ABCISemVer
const Version = version.ABCIVersion

View File

@@ -19,6 +19,58 @@ const (
BlockResponseMessageFieldKeySize
)
// EncodeMsg encodes a Protobuf message
func EncodeMsg(pb proto.Message) ([]byte, error) {
msg := bcproto.Message{}
switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}
bz, err := proto.Marshal(&msg)
if err != nil {
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
}
return bz, nil
}
// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -80,7 +80,7 @@ func TestBcStatusResponseMessageValidateBasic(t *testing.T) {
}
//nolint:lll // ignore line length in tests
func TestBlocksyncMessageVectors(t *testing.T) {
func TestBlockchainMessageVectors(t *testing.T) {
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil)
block.Version.Block = 11 // overwrite updated protocol version

View File

@@ -32,7 +32,6 @@ const (
maxTotalRequesters = 600
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
requestRetrySeconds = 30
// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we
@@ -603,7 +602,7 @@ OUTER_LOOP:
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height)
// log.Info("No peers available", "height", height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
@@ -613,7 +612,6 @@ OUTER_LOOP:
bpr.peerID = peer.id
bpr.mtx.Unlock()
to := time.NewTimer(requestRetrySeconds * time.Second)
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
WAIT_LOOP:
@@ -626,11 +624,6 @@ OUTER_LOOP:
return
case <-bpr.Quit():
return
case <-to.C:
bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID)
// Simulate a redo
bpr.reset()
continue OUTER_LOOP
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()

View File

@@ -30,7 +30,7 @@ const (
)
type consensusReactor interface {
// for when we switch from blocksync reactor and block sync to
// for when we switch from blockchain reactor and block sync to
// the consensus machine
SwitchToConsensus(state sm.State, skipWAL bool)
}
@@ -143,20 +143,21 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: MaxMsgSize,
MessageType: &bcproto.Message{},
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
peer.Send(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
},
})
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
peer.Send(BlocksyncChannel, msgBytes)
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
@@ -181,53 +182,69 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return false
}
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
})
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
}
return src.TrySend(BlocksyncChannel, msgBytes)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
return src.TrySend(BlocksyncChannel, msgBytes)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)
if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
switch msg := e.Message.(type) {
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
switch msg := msg.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, e.Src)
bcR.respondToPeer(msg, src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
case *bcproto.StatusRequest:
// Send peer our state.
e.Src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
},
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
return
}
src.TrySend(BlocksyncChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height)
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height)
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -268,10 +285,13 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
queued := peer.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockRequest{Height: request.Height},
})
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}
queued := peer.TrySend(BlocksyncChannel, msgBytes)
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
@@ -410,9 +430,13 @@ FOR_LOOP:
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
bcR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusRequest{},
})
bm, err := EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
}
bcR.Switch.Broadcast(BlocksyncChannel, bm)
return nil
}

View File

@@ -146,13 +146,13 @@ func newReactor(
}
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blocksync"))
bcReactor.SetLogger(logger.With("module", "blockchain"))
return ReactorPair{bcReactor, proxyApp}
}
func TestNoBlockResponse(t *testing.T) {
config = test.ResetTestRoot("blocksync_reactor_test")
config = test.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
@@ -164,7 +164,7 @@ func TestNoBlockResponse(t *testing.T) {
reactorPairs[1] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKSYNC", reactorPairs[i].reactor)
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
@@ -214,7 +214,7 @@ func TestNoBlockResponse(t *testing.T) {
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
config = test.ResetTestRoot("blocksync_reactor_test")
config = test.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
@@ -239,7 +239,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
reactorPairs[3] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKSYNC", reactorPairs[i].reactor)
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
@@ -278,7 +278,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKSYNC", reactorPairs[len(reactorPairs)-1].reactor)
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)

View File

@@ -14,11 +14,6 @@ var VersionCmd = &cobra.Command{
Use: "version",
Short: "Show version info",
Run: func(cmd *cobra.Command, args []string) {
tmVersion := version.TMCoreSemVer
if version.TMGitCommitHash != "" {
tmVersion += "+" + version.TMGitCommitHash
}
if verbose {
values, _ := json.MarshalIndent(struct {
Tendermint string `json:"tendermint"`
@@ -26,14 +21,14 @@ var VersionCmd = &cobra.Command{
BlockProtocol uint64 `json:"block_protocol"`
P2PProtocol uint64 `json:"p2p_protocol"`
}{
Tendermint: tmVersion,
ABCI: version.ABCISemVer,
Tendermint: version.TMCoreSemVer,
ABCI: version.ABCIVersion,
BlockProtocol: version.BlockProtocol,
P2PProtocol: version.P2PProtocol,
}, "", " ")
fmt.Println(string(values))
} else {
fmt.Println(tmVersion)
fmt.Println(version.TMCoreSemVer)
}
},
}

View File

@@ -26,7 +26,6 @@ import (
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
@@ -166,16 +165,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{prevote1.ToProto()},
ChannelID: VoteChannel,
})
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{prevote2.ToProto()},
ChannelID: VoteChannel,
})
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
}
}
} else {
@@ -527,26 +520,18 @@ func sendProposalAndParts(
parts *types.PartSet,
) {
// proposal
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
})
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))
// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
pp, err := part.ToProto()
if err != nil {
panic(err) // TODO: wbanfield better error handling
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: *pp,
},
})
peer.Send(DataChannel, MustEncode(msg))
}
// votes
@@ -554,14 +539,9 @@ func sendProposalAndParts(
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{prevote.ToProto()},
})
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{precommit.ToProto()},
})
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
}
//----------------------------------------
@@ -599,7 +579,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(e p2p.Envelope) {
br.reactor.Receive(e)
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

View File

@@ -7,7 +7,6 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
@@ -95,10 +94,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{precommit.ToProto()},
ChannelID: VoteChannel,
})
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
}
}()
}

View File

@@ -4,6 +4,8 @@ import (
"errors"
"fmt"
"github.com/cosmos/gogoproto/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmmath "github.com/tendermint/tendermint/libs/math"
@@ -13,9 +15,7 @@ import (
"github.com/tendermint/tendermint/types"
)
// MsgToProto takes a consensus message type and returns the proto defined consensus message.
//
// TODO: This needs to be removed, but WALToProto depends on this.
// MsgToProto takes a consensus message type and returns the proto defined consensus message
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
@@ -260,6 +260,20 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
return pb, nil
}
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
panic(err)
}
enc, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return enc
}
// WALToProto takes a WAL message and return a proto walMessage and error
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmevents "github.com/tendermint/tendermint/libs/events"
@@ -146,7 +148,6 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
Priority: 6,
SendQueueCapacity: 100,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: DataChannel, // maybe split between gossiping current block and catchup stuff
@@ -155,7 +156,6 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: VoteChannel,
@@ -163,7 +163,6 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: VoteSetBitsChannel,
@@ -171,7 +170,6 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
}
}
@@ -225,44 +223,34 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *Reactor) Receive(e p2p.Envelope) {
func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)
conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
return
}
if w, ok := e.Message.(p2p.Wrapper); ok {
var err error
e.Message, err = w.Wrap()
if err != nil {
conR.Logger.Error("Error wrapping message", "src", e.Src, "chId", e.ChannelID, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
}
msg, err := MsgFromProto(e.Message.(*tmcons.Message))
msg, err := decodeMsg(msgBytes)
if err != nil {
conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
if err = msg.ValidateBasic(); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
// Get peer states
ps, ok := e.Src.Get(types.PeerStateKey).(*PeerState)
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", e.Src))
panic(fmt.Sprintf("Peer %v has no state", src))
}
switch e.ChannelID {
switch chID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
@@ -270,8 +258,8 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
return
}
ps.ApplyNewRoundStepMessage(msg)
@@ -290,7 +278,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
// Peer claims to have a maj23 for some BlockID at H,R,S,
err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
if err != nil {
conR.Switch.StopPeerForError(e.Src, err)
conR.Switch.StopPeerForError(src, err)
return
}
// Respond with a VoteSetBitsMessage showing which votes we have.
@@ -304,19 +292,13 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
eMsg := &tmcons.VoteSetBits{
src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID.ToProto(),
}
if votes := ourVotes.ToProto(); votes != nil {
eMsg.Votes = *votes
}
e.Src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: eMsg,
})
BlockID: msg.BlockID,
Votes: ourVotes,
}))
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -329,13 +311,13 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -355,7 +337,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
default:
// don't punish (leave room for soft upgrades)
@@ -394,7 +376,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", e.ChannelID))
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
}
}
@@ -448,39 +430,29 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: nrsMsg,
})
conR.Switch.Broadcast(StateChannel, MustEncode(nrsMsg))
}
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
psh := rs.ProposalBlockParts.Header()
csMsg := &tmcons.NewValidBlock{
csMsg := &NewValidBlockMessage{
Height: rs.Height,
Round: rs.Round,
BlockPartSetHeader: psh.ToProto(),
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: csMsg,
})
conR.Switch.Broadcast(StateChannel, MustEncode(csMsg))
}
// Broadcasts HasVoteMessage to peers that care.
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
msg := &tmcons.HasVote{
msg := &HasVoteMessage{
Height: vote.Height,
Round: vote.Round,
Type: vote.Type,
Index: vote.ValidatorIndex,
}
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: msg,
})
conR.Switch.Broadcast(StateChannel, MustEncode(msg))
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
@@ -491,11 +463,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
prs := ps.GetRoundState()
if prs.Height == vote.Height {
// TODO: Also filter on round?
e := p2p.Envelope{
ChannelID: StateChannel, struct{ ConsensusMessage }{msg},
Message: p,
}
peer.TrySend(e)
peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
} else {
// Height doesn't match
// TODO: check a field, maybe CatchupCommitRound?
@@ -505,11 +473,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) {
nrsMsg = &tmcons.NewRoundStep{
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: uint32(rs.Step),
Step: rs.Step,
SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.GetRound(),
}
@@ -519,10 +487,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep)
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.getRoundState()
nrsMsg := makeRoundStepMessage(rs)
peer.Send(p2p.Envelope{
ChannelID: StateChannel,
Message: nrsMsg,
})
peer.Send(StateChannel, MustEncode(nrsMsg))
}
func (conR *Reactor) updateRoundStateRoutine() {
@@ -561,19 +526,13 @@ OUTER_LOOP:
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
parts, err := part.ToProto()
if err != nil {
panic(err)
msg := &BlockPartMessage{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: *parts,
},
}) {
if peer.Send(DataChannel, MustEncode(msg)) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
@@ -619,11 +578,9 @@ OUTER_LOOP:
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
{
msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
}) {
if peer.Send(DataChannel, MustEncode(msg)) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
}
@@ -633,15 +590,13 @@ OUTER_LOOP:
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{
Height: rs.Height,
ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
}
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.ProposalPOL{
Height: rs.Height,
ProposalPolRound: rs.Proposal.POLRound,
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
},
})
peer.Send(DataChannel, MustEncode(msg))
}
continue OUTER_LOOP
}
@@ -678,20 +633,13 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
return
}
// Send the part
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
pp, err := part.ToProto()
if err != nil {
logger.Error("Could not convert part to proto", "index", index, "error", err)
return
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: *pp,
},
}) {
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
if peer.Send(DataChannel, MustEncode(msg)) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
logger.Debug("Sending block part for catchup failed")
@@ -850,16 +798,12 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23.ToProto(),
},
})
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23,
}))
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -871,15 +815,12 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23.ToProto(),
},
})
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23,
}))
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -891,16 +832,12 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23.ToProto(),
},
})
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23,
}))
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -915,15 +852,12 @@ OUTER_LOOP:
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID.ToProto(),
},
})
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID,
}))
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -1137,13 +1071,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{
Vote: vote.ToProto(),
},
}) {
if ps.peer.Send(VoteChannel, MustEncode(msg)) {
ps.SetHasVote(vote)
return true
}
@@ -1509,6 +1439,15 @@ func init() {
tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
}
func decodeMsg(bz []byte) (msg Message, err error) {
pb := &tmcons.Message{}
if err = proto.Unmarshal(bz, pb); err != nil {
return msg, err
}
return MsgFromProto(pb)
}
//-------------------------------------
// NewRoundStepMessage is sent for every step taken in the ConsensusState.

View File

@@ -33,7 +33,6 @@ import (
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
statemocks "github.com/tendermint/tendermint/state/mocks"
@@ -266,18 +265,15 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
reactor.InitPeer(peer)
// simulate switch calling Receive before AddPeer
assert.NotPanics(t, func() {
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
reactor.Receive(StateChannel, peer, msg)
reactor.AddPeer(peer)
})
}
@@ -292,18 +288,15 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
// we should call InitPeer here
// simulate switch calling Receive before AddPeer
assert.Panics(t, func() {
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
reactor.Receive(StateChannel, peer, msg)
})
}

View File

@@ -735,7 +735,7 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
// Prune block store if requested
expectError := false
if mode == 3 {
pruned, _, err := store.PruneBlocks(2, state)
pruned, err := store.PruneBlocks(2)
require.NoError(t, err)
require.EqualValues(t, 1, pruned)
expectError = int64(nBlocks) < 2
@@ -1185,8 +1185,7 @@ func (bs *mockBlockStore) LoadSeenCommit(height int64) *types.Commit {
return bs.commits[height-1]
}
func (bs *mockBlockStore) PruneBlocks(height int64, state sm.State) (uint64, int64, error) {
evidencePoint := height
func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
pruned := uint64(0)
for i := int64(0); i < height-1; i++ {
bs.chain[i] = nil
@@ -1194,7 +1193,7 @@ func (bs *mockBlockStore) PruneBlocks(height int64, state sm.State) (uint64, int
pruned++
}
bs.base = height
return pruned, evidencePoint, nil
return pruned, nil
}
func (bs *mockBlockStore) DeleteLatestBlock() error { return nil }

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"time"
"github.com/cosmos/gogoproto/proto"
clist "github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
@@ -56,7 +55,6 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
ID: EvidenceChannel,
Priority: 6,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmproto.EvidenceList{},
},
}
}
@@ -68,11 +66,11 @@ func (evR *Reactor) AddPeer(peer p2p.Peer) {
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *Reactor) Receive(e p2p.Envelope) {
evis, err := evidenceListFromProto(e.Message)
func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
evis, err := decodeMsg(msgBytes)
if err != nil {
evR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
evR.Switch.StopPeerForError(e.Src, err)
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
evR.Switch.StopPeerForError(src, err)
return
}
@@ -82,7 +80,7 @@ func (evR *Reactor) Receive(e p2p.Envelope) {
case *types.ErrInvalidEvidence:
evR.Logger.Error(err.Error())
// punish peer
evR.Switch.StopPeerForError(e.Src, err)
evR.Switch.StopPeerForError(src, err)
return
case nil:
default:
@@ -128,15 +126,11 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
evis := evR.prepareEvidenceMessage(peer, ev)
if len(evis) > 0 {
evR.Logger.Debug("Gossiping evidence to peer", "ev", ev, "peer", peer)
evp, err := evidenceListToProto(evis)
msgBytes, err := encodeMsg(evis)
if err != nil {
panic(err)
}
success := peer.Send(p2p.Envelope{
ChannelID: EvidenceChannel,
Message: evp,
})
success := peer.Send(EvidenceChannel, msgBytes)
if !success {
time.Sleep(peerRetryMessageIntervalMS * time.Millisecond)
continue
@@ -216,7 +210,7 @@ type PeerState interface {
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
func encodeMsg(evis []types.Evidence) ([]byte, error) {
evi := make([]tmproto.Evidence, len(evis))
for i := 0; i < len(evis); i++ {
ev, err := types.EvidenceToProto(evis[i])
@@ -228,13 +222,19 @@ func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
epl := tmproto.EvidenceList{
Evidence: evi,
}
return &epl, nil
return epl.Marshal()
}
func evidenceListFromProto(m proto.Message) ([]types.Evidence, error) {
lm := m.(*tmproto.EvidenceList)
// decodemsg takes an array of bytes
// returns an array of evidence
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
lm := tmproto.EvidenceList{}
if err := lm.Unmarshal(bz); err != nil {
return nil, err
}
evis := make([]types.Evidence, len(lm.Evidence))
evis = make([]types.Evidence, len(lm.Evidence))
for i := 0; i < len(lm.Evidence); i++ {
ev, err := types.EvidenceFromProto(&lm.Evidence[i])
if err != nil {

View File

@@ -208,10 +208,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
// i.e. broadcastEvidenceRoutine finishes when peer is stopped
defer leaktest.CheckTimeout(t, 10*time.Second)()
p.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == evidence.EvidenceChannel
})).Return(false)
p.On("Send", evidence.EvidenceChannel, mock.AnythingOfType("[]uint8")).Return(false)
quitChan := make(<-chan struct{})
p.On("Quit").Return(quitChan)
ps := peerState{2}

View File

@@ -21,6 +21,7 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
state = evpool.State()
height = state.LastBlockHeight
evidenceParams = state.ConsensusParams.Evidence
ageNumBlocks = height - evidence.Height()
)
// verify the time of the evidence
@@ -33,9 +34,10 @@ func (evpool *Pool) verify(evidence types.Evidence) error {
return fmt.Errorf("evidence has a different time to the block it is associated with (%v != %v)",
evidence.Time(), evTime)
}
ageDuration := state.LastBlockTime.Sub(evTime)
// checking if evidence is expired calculated using the block evidence time and height
if IsEvidenceExpired(height, state.LastBlockTime, evidence.Height(), evTime, evidenceParams) {
// check that the evidence hasn't expired
if ageDuration > evidenceParams.MaxAgeDuration && ageNumBlocks > evidenceParams.MaxAgeNumBlocks {
return fmt.Errorf(
"evidence from height %d (created at: %v) is too old; min height is %d and evidence can not be older than %v",
evidence.Height(),
@@ -282,14 +284,3 @@ func getSignedHeader(blockStore BlockStore, height int64) (*types.SignedHeader,
Commit: commit,
}, nil
}
// check that the evidence hasn't expired
func IsEvidenceExpired(heightNow int64, timeNow time.Time, heightEv int64, timeEv time.Time, evidenceParams types.EvidenceParams) bool {
ageDuration := timeNow.Sub(timeEv)
ageNumBlocks := heightNow - heightEv
if ageDuration > evidenceParams.MaxAgeDuration && ageNumBlocks > evidenceParams.MaxAgeNumBlocks {
return true
}
return false
}

2
go.mod
View File

@@ -34,7 +34,7 @@ require (
github.com/tendermint/tm-db v0.6.6
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
golang.org/x/net v0.0.0-20220812174116-3211cb980234
google.golang.org/grpc v1.50.0
google.golang.org/grpc v1.49.0
)
require (

16
go.sum
View File

@@ -56,7 +56,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d/go.mod h1:URdX5+vg25ts3aCh8H5IFZybJYKWhJHYMTnf+ULtoC4=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.4.1 h1:3oxKN3wbHibqx897utPC2LTQU4J+IHWWJO+glkAkpFM=
@@ -235,6 +234,7 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d h1:49RLWk1j44Xu4fjHb6JFYmeUnDORVwHNkDxaQ0ctCVU=
github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y=
github.com/cosmos/gogoproto v1.4.2 h1:UeGRcmFW41l0G0MiefWhkPEVEwvu78SZsHBvI78dAYw=
github.com/cosmos/gogoproto v1.4.2/go.mod h1:cLxOsn1ljAHSV527CHOtaIP91kK6cCrZETRBrkzItWU=
@@ -545,9 +545,6 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/gtank/merlin v0.1.1-0.20191105220539-8318aed1a79f/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s=
github.com/gtank/merlin v0.1.1/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s=
github.com/gtank/ristretto255 v0.1.2/go.mod h1:Ph5OpO6c7xKUGROZfWVLiJf9icMDwUeIvY4OmlYW69o=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/api v1.10.1/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M=
@@ -742,8 +739,6 @@ github.com/mgechev/revive v1.2.3/go.mod h1:iAWlQishqCuj4yhV24FTnKSXGpbAA+0SckXB8
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4=
github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM=
github.com/mimoo/StrobeGo v0.0.0-20210601165009-122bf33a46e0/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM=
github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
@@ -805,10 +800,10 @@ github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3L
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oasisprotocol/curve25519-voi v0.0.0-20220708102147-0a8a51822cae h1:FatpGJD2jmJfhZiFDElaC0QhZUDQnxUeAwTGkfAHN3I=
github.com/oasisprotocol/curve25519-voi v0.0.0-20220708102147-0a8a51822cae/go.mod h1:hVoHR2EVESiICEMbg137etN/Lx+lSrHPTD39Z/uE+2s=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oasisprotocol/curve25519-voi v0.0.0-20220708102147-0a8a51822cae h1:FatpGJD2jmJfhZiFDElaC0QhZUDQnxUeAwTGkfAHN3I=
github.com/oasisprotocol/curve25519-voi v0.0.0-20220708102147-0a8a51822cae/go.mod h1:hVoHR2EVESiICEMbg137etN/Lx+lSrHPTD39Z/uE+2s=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
@@ -1207,7 +1202,6 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@@ -1696,8 +1690,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.50.0 h1:fPVVDxY9w++VjTZsYvXWqEf9Rqar/e+9zYfxKK+W+YU=
google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -134,7 +134,6 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
ID: mempool.MempoolChannel,
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
MessageType: &protomem.Message{},
},
}
}
@@ -155,34 +154,28 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
return
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
var err error
for _, tx := range protoTxs {
ntx := types.Tx(tx)
err = memR.mempool.CheckTx(ntx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := memR.decodeMsg(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
memR.Switch.StopPeerForError(src, err)
return
}
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)}
if src != nil {
txInfo.SenderP2PID = src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
}
}
// broadcasting happens from go routines per peer
}
@@ -241,14 +234,18 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// https://github.com/tendermint/tendermint/issues/5796
if _, ok := memTx.senders.Load(peerID); !ok {
success := peer.Send(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Message: &protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
})
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
success := peer.Send(mempool.MempoolChannel, bz)
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
@@ -267,6 +264,35 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}
}
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
msg := protomem.Message{}
err := msg.Unmarshal(bz)
if err != nil {
return TxsMessage{}, err
}
var message TxsMessage
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
txs := i.Txs.GetTxs()
if len(txs) == 0 {
return message, errors.New("empty TxsMessage")
}
decoded := make([]types.Tx, len(txs))
for j, tx := range txs {
decoded[j] = types.Tx(tx)
}
message = TxsMessage{
Txs: decoded,
}
return message, nil
}
return message, fmt.Errorf("msg type: %T is not supported", msg)
}
// TxsMessage is a Message containing transactions.
type TxsMessage struct {
Txs []types.Tx

View File

@@ -22,7 +22,6 @@ import (
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@@ -280,12 +279,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
for i := 0; i < mempool.MaxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Src: peer,
Message: &protomem.Txs{
Txs: [][]byte{{0x01, 0x02, 0x03}},
}})
reactor.Receive(mempool.MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
reactor.AddPeer(peer)
}
}

View File

@@ -133,7 +133,6 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
ID: mempool.MempoolChannel,
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
MessageType: &protomem.Message{},
},
}
}
@@ -154,35 +153,27 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
return
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
var err error
for _, tx := range protoTxs {
ntx := types.Tx(tx)
err = memR.mempool.CheckTx(ntx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := memR.decodeMsg(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
memR.Switch.StopPeerForError(src, err)
return
}
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)}
if src != nil {
txInfo.SenderP2PID = src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if err == mempool.ErrTxInCache {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
}
}
// broadcasting happens from go routines per peer
}
@@ -242,14 +233,18 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796
if !memTx.HasPeer(peerID) {
success := peer.Send(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Message: &protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
})
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
success := peer.Send(mempool.MempoolChannel, bz)
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
@@ -273,6 +268,37 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
//-----------------------------------------------------------------------------
// Messages
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
msg := protomem.Message{}
err := msg.Unmarshal(bz)
if err != nil {
return TxsMessage{}, err
}
var message TxsMessage
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
txs := i.Txs.GetTxs()
if len(txs) == 0 {
return message, errors.New("empty TxsMessage")
}
decoded := make([]types.Tx, len(txs))
for j, tx := range txs {
decoded[j] = types.Tx(tx)
}
message = TxsMessage{
Txs: decoded,
}
return message, nil
}
return message, fmt.Errorf("msg type: %T is not supported", msg)
}
//-------------------------------------
// TxsMessage is a Message containing transactions.
type TxsMessage struct {
Txs []types.Tx

View File

@@ -31,7 +31,7 @@ To replace the built-in p2p.Reactor, use the CustomReactors option:
dbProvider,
metricsProvider,
logger,
CustomReactors(map[string]p2p.Reactor{"BLOCKSYNC": customBlocksyncReactor}),
CustomReactors(map[string]p2p.Reactor{"BLOCKCHAIN": customBlockchainReactor}),
)
The list of existing reactors can be found in CustomReactors documentation.

View File

@@ -146,7 +146,7 @@ type blockSyncReactor interface {
// result in replacing it with the custom one.
//
// - MEMPOOL
// - BLOCKSYNC
// - BLOCKCHAIN
// - CONSENSUS
// - EVIDENCE
// - PEX
@@ -303,7 +303,7 @@ func createAndStartIndexerService(
blockIndexer = &blockidxnull.BlockerIndexer{}
}
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
if err := indexerService.Start(); err != nil {
@@ -335,10 +335,8 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
// Log the version info.
logger.Info("Version info",
"tendermint_version", version.TMCoreSemVer,
"abci", version.ABCISemVer,
"block", version.BlockProtocol,
"p2p", version.P2PProtocol,
"commit_hash", version.TMGitCommitHash,
)
// If the state and software differ in block version, at least log it.
@@ -441,7 +439,7 @@ func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
return evidenceReactor, evidencePool, nil
}
func createBlocksyncReactor(config *cfg.Config,
func createBlockchainReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
@@ -457,7 +455,7 @@ func createBlocksyncReactor(config *cfg.Config,
return nil, fmt.Errorf("unknown fastsync version %s", config.BlockSync.Version)
}
bcReactor.SetLogger(logger.With("module", "blocksync"))
bcReactor.SetLogger(logger.With("module", "blockchain"))
return bcReactor, nil
}
@@ -584,7 +582,7 @@ func createSwitch(config *cfg.Config,
)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKSYNC", bcReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
@@ -803,7 +801,7 @@ func NewNode(config *cfg.Config,
return nil, err
}
// make block executor for consensus and blocksync reactors to execute blocks
// make block executor for consensus and blockchain reactors to execute blocks
blockExec := sm.NewBlockExecutor(
stateStore,
logger.With("module", "state"),
@@ -814,10 +812,10 @@ func NewNode(config *cfg.Config,
sm.BlockExecutorWithMetrics(smMetrics),
)
// Make BlocksyncReactor. Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
// Make BlockchainReactor. Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
if err != nil {
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
@@ -990,7 +988,7 @@ func (n *Node) OnStart() error {
if n.stateSync {
bcR, ok := n.bcReactor.(blockSyncReactor)
if !ok {
return fmt.Errorf("this blocksync reactor does not support switching from state sync")
return fmt.Errorf("this blockchain reactor does not support switching from state sync")
}
err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
n.config.StateSync, n.config.BlockSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis)

View File

@@ -427,7 +427,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
RecvMessageCapacity: 100,
},
}
customBlocksyncReactor := p2pmock.NewReactor()
customBlockchainReactor := p2pmock.NewReactor()
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)
@@ -440,7 +440,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
log.TestingLogger(),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKCHAIN": customBlockchainReactor}),
)
require.NoError(t, err)
@@ -451,8 +451,8 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
assert.True(t, cr.IsRunning())
assert.Equal(t, cr, n.Switch().Reactor("FOO"))
assert.True(t, customBlocksyncReactor.IsRunning())
assert.Equal(t, customBlocksyncReactor, n.Switch().Reactor("BLOCKSYNC"))
assert.True(t, customBlockchainReactor.IsRunning())
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
assert.Contains(t, channels, mempl.MempoolChannel)

View File

@@ -38,13 +38,13 @@ type Reactor interface {
// or other reason).
RemovePeer(peer Peer, reason interface{})
// Receive is called by the switch when a message is received from the peer.
// Receive is called by the switch when msgBytes is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
//
// CONTRACT: msgBytes are not nil.
Receive(Envelope)
Receive(chID byte, peer Peer, msgBytes []byte)
}
//--------------------------------------
@@ -64,8 +64,8 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor {
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) Receive(e Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }

View File

@@ -724,7 +724,6 @@ type ChannelDescriptor struct {
SendQueueCapacity int
RecvBufferCapacity int
RecvMessageCapacity int
MessageType proto.Message
}
func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {

View File

@@ -145,13 +145,6 @@ func (e ErrTransportClosed) Error() string {
return "transport has been closed"
}
// ErrPeerRemoval is raised when attempting to remove a peer results in an error.
type ErrPeerRemoval struct{}
func (e ErrPeerRemoval) Error() string {
return "peer removal failed"
}
//-------------------------------------------------------------------
type ErrNetAddressNoID struct {

View File

@@ -44,29 +44,15 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "num_txs",
Help: "Number of transactions submitted by each peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
MessageReceiveBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "message_receive_bytes_total",
Help: "Number of bytes of each message type received.",
}, append(labels, "message_type")).With(labelsAndValues...),
MessageSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "message_send_bytes_total",
Help: "Number of bytes of each message type sent.",
}, append(labels, "message_type")).With(labelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
Peers: discard.NewGauge(),
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
NumTxs: discard.NewGauge(),
MessageReceiveBytesTotal: discard.NewCounter(),
MessageSendBytesTotal: discard.NewCounter(),
Peers: discard.NewGauge(),
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
NumTxs: discard.NewGauge(),
}
}

View File

@@ -24,8 +24,4 @@ type Metrics struct {
PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"`
// Number of transactions submitted by each peer.
NumTxs metrics.Gauge `metrics_labels:"peer_id"`
// Number of bytes of each message type received.
MessageReceiveBytesTotal metrics.Counter `metrics_labels:"message_type"`
// Number of bytes of each message type sent.
MessageSendBytesTotal metrics.Counter `metrics_labels:"message_type"`
}

View File

@@ -42,9 +42,9 @@ func NewPeer(ip net.IP) *Peer {
return mp
}
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(e p2p.Envelope) bool { return true }
func (mp *Peer) Send(e p2p.Envelope) bool { return true }
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *Peer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
DefaultNodeID: mp.addr.ID,
@@ -68,5 +68,3 @@ func (mp *Peer) RemoteIP() net.IP { return mp.ip }
func (mp *Peer) SocketAddr() *p2p.NetAddress { return mp.addr }
func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *Peer) CloseConn() error { return nil }
func (mp *Peer) SetRemovalFailed() {}
func (mp *Peer) GetRemovalFailed() bool { return false }

View File

@@ -19,7 +19,7 @@ func NewReactor() *Reactor {
return r
}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(e p2p.Envelope) {}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}

View File

@@ -53,20 +53,6 @@ func (_m *Peer) Get(_a0 string) interface{} {
return r0
}
// GetRemovalFailed provides a mock function with given fields:
func (_m *Peer) GetRemovalFailed() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// ID provides a mock function with given fields:
func (_m *Peer) ID() p2p.ID {
ret := _m.Called()
@@ -234,13 +220,13 @@ func (_m *Peer) Reset() error {
return r0
}
// Send provides a mock function with given fields: _a0
func (_m *Peer) Send(_a0 p2p.Envelope) bool {
ret := _m.Called(_a0)
// Send provides a mock function with given fields: _a0, _a1
func (_m *Peer) Send(_a0 byte, _a1 []byte) bool {
ret := _m.Called(_a0, _a1)
var r0 bool
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(byte, []byte) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}
@@ -258,11 +244,6 @@ func (_m *Peer) SetLogger(_a0 log.Logger) {
_m.Called(_a0)
}
// SetRemovalFailed provides a mock function with given fields:
func (_m *Peer) SetRemovalFailed() {
_m.Called()
}
// SocketAddr provides a mock function with given fields:
func (_m *Peer) SocketAddr() *p2p.NetAddress {
ret := _m.Called()
@@ -335,13 +316,13 @@ func (_m *Peer) String() string {
return r0
}
// TrySend provides a mock function with given fields: _a0
func (_m *Peer) TrySend(_a0 p2p.Envelope) bool {
ret := _m.Called(_a0)
// TrySend provides a mock function with given fields: _a0, _a1
func (_m *Peer) TrySend(_a0 byte, _a1 []byte) bool {
ret := _m.Called(_a0, _a1)
var r0 bool
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
r0 = rf(_a0)
if rf, ok := ret.Get(0).(func(byte, []byte) bool); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(bool)
}

View File

@@ -5,8 +5,6 @@ import (
"net"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/libs/cmap"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@@ -36,14 +34,11 @@ type Peer interface {
Status() tmconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket
Send(Envelope) bool
TrySend(Envelope) bool
Send(byte, []byte) bool
TrySend(byte, []byte) bool
Set(string, interface{})
Get(string) interface{}
SetRemovalFailed()
GetRemovalFailed() bool
}
//----------------------------------------------------------
@@ -122,9 +117,6 @@ type peer struct {
metrics *Metrics
metricsTicker *time.Ticker
// When removal of a peer fails, we set this flag
removalAttemptFailed bool
}
type PeerOption func(*peer)
@@ -134,7 +126,6 @@ func newPeer(
mConfig tmconn.MConnConfig,
nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor,
msgTypeByChID map[byte]proto.Message,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
@@ -152,7 +143,6 @@ func newPeer(
pc.conn,
p,
reactorsByCh,
msgTypeByChID,
chDescs,
onPeerError,
mConfig,
@@ -253,31 +243,19 @@ func (p *peer) Status() tmconn.ConnectionStatus {
// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
func (p *peer) Send(e Envelope) bool {
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(e.ChannelID) {
} else if !p.hasChannel(chID) {
return false
}
msg := e.Message
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
if err != nil {
panic(err)
}
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
panic(err) // Q: should this panic or error?
}
res := p.mconn.Send(e.ChannelID, msgBytes)
res := p.mconn.Send(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", e.ChannelID),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
@@ -286,29 +264,17 @@ func (p *peer) Send(e Envelope) bool {
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) TrySend(e Envelope) bool {
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(e.ChannelID) {
} else if !p.hasChannel(chID) {
return false
}
msg := e.Message
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
if err != nil {
panic(err)
}
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
panic(err)
}
res := p.mconn.TrySend(e.ChannelID, msgBytes)
res := p.mconn.TrySend(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", e.ChannelID),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
@@ -350,14 +316,6 @@ func (p *peer) CloseConn() error {
return p.peerConn.conn.Close()
}
func (p *peer) SetRemovalFailed() {
p.removalAttemptFailed = true
}
func (p *peer) GetRemovalFailed() bool {
return p.removalAttemptFailed
}
//---------------------------------------------------
// methods only used for testing
// TODO: can we remove these?
@@ -412,7 +370,6 @@ func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
msgTypeByChID map[byte]proto.Message,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,
@@ -425,31 +382,12 @@ func createMConnection(
// which does onPeerError.
panic(fmt.Sprintf("Unknown channel %X", chID))
}
mt := msgTypeByChID[chID]
msg := proto.Clone(mt)
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
// TODO(williambanfield) add a log line
return
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
if w, ok := msg.(Unwrapper); ok {
msg, err = w.Unwrap()
if err != nil {
// TODO(williambanfield) add error log line.
return
}
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", "tmp").Add(float64(len(msgBytes)))
reactor.Receive(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {

View File

@@ -47,9 +47,6 @@ func (ps *PeerSet) Add(peer Peer) error {
if ps.lookup[peer.ID()] != nil {
return ErrSwitchDuplicatePeerID{peer.ID()}
}
if peer.GetRemovalFailed() {
return ErrPeerRemoval{}
}
index := len(ps.list)
// Appending is safe even with other goroutines
@@ -110,12 +107,6 @@ func (ps *PeerSet) Remove(peer Peer) bool {
item := ps.lookup[peer.ID()]
if item == nil {
// Removing the peer has failed so we set a flag to mark that a removal was attempted.
// This can happen when the peer add routine from the switch is running in
// parallel to the receive routine of MConn.
// There is an error within MConn but the switch has not actually added the peer to the peer set yet.
// Setting this flag will prevent a peer from being added to a node's peer set afterwards.
peer.SetRemovalFailed()
return false
}

View File

@@ -18,22 +18,20 @@ type mockPeer struct {
id ID
}
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySend(e Envelope) bool { return true }
func (mp *mockPeer) Send(e Envelope) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
func (mp *mockPeer) SetRemovalFailed() {}
func (mp *mockPeer) GetRemovalFailed() bool { return false }
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
// Returns a mock peer
func newMockPeer(ip net.IP) *mockPeer {

View File

@@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -15,7 +14,6 @@ import (
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/config"
tmconn "github.com/tendermint/tendermint/p2p/conn"
@@ -72,7 +70,7 @@ func TestPeerSend(t *testing.T) {
})
assert.True(p.CanSend(testCh))
assert.True(p.Send(Envelope{ChannelID: testCh, Message: &p2p.Message{}}))
assert.True(p.Send(testCh, []byte("Asylum")))
}
func createOutboundPeerAndPerformHandshake(
@@ -84,9 +82,6 @@ func createOutboundPeerAndPerformHandshake(
{ID: testCh, Priority: 1},
}
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
msgTypeByChID := map[byte]proto.Message{
testCh: &p2p.Message{},
}
pk := ed25519.GenPrivKey()
pc, err := testOutboundPeerConn(addr, config, false, pk)
if err != nil {
@@ -99,7 +94,7 @@ func createOutboundPeerAndPerformHandshake(
return nil, err
}
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {})
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {})
p.SetLogger(log.TestingLogger().With("peer", addr))
return p, nil
}

View File

@@ -6,6 +6,8 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/libs/cmap"
tmmath "github.com/tendermint/tendermint/libs/math"
tmrand "github.com/tendermint/tendermint/libs/rand"
@@ -182,7 +184,6 @@ func (r *Reactor) GetChannels() []*conn.ChannelDescriptor {
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmp2p.Message{},
},
}
}
@@ -235,10 +236,16 @@ func (r *Reactor) logErrAddrBook(err error) {
}
// Receive implements Reactor by handling incoming PEX messages.
func (r *Reactor) Receive(e p2p.Envelope) {
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
switch msg := e.Message.(type) {
switch msg := msg.(type) {
case *tmp2p.PexRequest:
// NOTE: this is a prime candidate for amplification attacks,
@@ -248,8 +255,8 @@ func (r *Reactor) Receive(e p2p.Envelope) {
// If we're a seed and this is an inbound peer,
// respond once and disconnect.
if r.config.SeedMode && !e.Src.IsOutbound() {
id := string(e.Src.ID())
if r.config.SeedMode && !src.IsOutbound() {
id := string(src.ID())
v := r.lastReceivedRequests.Get(id)
if v != nil {
// FlushStop/StopPeer are already
@@ -259,36 +266,36 @@ func (r *Reactor) Receive(e p2p.Envelope) {
r.lastReceivedRequests.Set(id, time.Now())
// Send addrs and disconnect
r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
go func() {
// In a go-routine so it doesn't block .Receive.
e.Src.FlushStop()
r.Switch.StopPeerGracefully(e.Src)
src.FlushStop()
r.Switch.StopPeerGracefully(src)
}()
} else {
// Check we're not receiving requests too frequently.
if err := r.receiveRequest(e.Src); err != nil {
r.Switch.StopPeerForError(e.Src, err)
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
if err := r.receiveRequest(src); err != nil {
r.Switch.StopPeerForError(src, err)
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
return
}
r.SendAddrs(e.Src, r.book.GetSelection())
r.SendAddrs(src, r.book.GetSelection())
}
case *tmp2p.PexAddrs:
// If we asked for addresses, add them to the book
addrs, err := p2p.NetAddressesFromProto(msg.Addrs)
if err != nil {
r.Switch.StopPeerForError(e.Src, err)
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
r.Switch.StopPeerForError(src, err)
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
return
}
err = r.ReceiveAddrs(addrs, e.Src)
err = r.ReceiveAddrs(addrs, src)
if err != nil {
r.Switch.StopPeerForError(e.Src, err)
r.Switch.StopPeerForError(src, err)
if err == ErrUnsolicitedList {
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
}
return
}
@@ -341,10 +348,7 @@ func (r *Reactor) RequestAddrs(p Peer) {
}
r.Logger.Debug("Request addrs", "from", p)
r.requestsSent.Set(id, struct{}{})
p.Send(p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexRequest{},
})
p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{}))
}
// ReceiveAddrs adds the given addrs to the addrbook if theres an open
@@ -402,11 +406,7 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
// SendAddrs sends addrs to the peer.
func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
e := p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
}
p.Send(e)
p.Send(PexChannel, mustEncode(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}))
}
// SetEnsurePeersPeriod sets period to ensure peers connected.
@@ -763,3 +763,43 @@ func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
book.MarkAttempt(addr)
}
}
//-----------------------------------------------------------------------------
// Messages
// mustEncode proto encodes a tmp2p.Message
func mustEncode(pb proto.Message) []byte {
msg := tmp2p.Message{}
switch pb := pb.(type) {
case *tmp2p.PexRequest:
msg.Sum = &tmp2p.Message_PexRequest{PexRequest: pb}
case *tmp2p.PexAddrs:
msg.Sum = &tmp2p.Message_PexAddrs{PexAddrs: pb}
default:
panic(fmt.Sprintf("Unknown message type %T", pb))
}
bz, err := msg.Marshal()
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &tmp2p.Message{}
err := pb.Unmarshal(bz)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *tmp2p.Message_PexRequest:
return msg.PexRequest, nil
case *tmp2p.Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -131,11 +131,12 @@ func TestPEXReactorReceive(t *testing.T) {
r.RequestAddrs(peer)
size := book.Size()
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
r.Receive(PexChannel, peer, msg)
assert.Equal(t, size+1, book.Size())
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
msg = mustEncode(&tmp2p.PexRequest{})
r.Receive(PexChannel, peer, msg) // should not panic.
}
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
@@ -154,19 +155,20 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
require.True(t, book.HasAddress(peerAddr))
id := string(peer.ID())
msg := mustEncode(&tmp2p.PexRequest{})
// first time creates the entry
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.Receive(PexChannel, peer, msg)
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// next time sets the last time value
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.Receive(PexChannel, peer, msg)
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// third time is too many too soon - peer is removed
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.Receive(PexChannel, peer, msg)
assert.False(t, r.lastReceivedRequests.Has(id))
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peerAddr))
@@ -190,15 +192,15 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
assert.True(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
// receive some addrs. should clear the request
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
r.Receive(PexChannel, peer, msg)
assert.False(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// receiving more unsolicited addrs causes a disconnect and ban
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
r.Receive(PexChannel, peer, msg)
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peer.SocketAddr()))
}
@@ -484,12 +486,8 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
pexR.RequestAddrs(peer)
size := book.Size()
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
pexR.Receive(p2p.Envelope{
ChannelID: PexChannel,
Src: peer,
Message: msg,
})
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
pexR.Receive(PexChannel, peer, msg)
assert.Equal(t, size, book.Size())
pexR.AddPeer(peer)
@@ -697,10 +695,7 @@ func TestPexVectors(t *testing.T) {
for _, tc := range testCases {
tc := tc
w, err := tc.msg.(p2p.Wrapper).Wrap()
require.NoError(t, err)
bz, err := proto.Marshal(w)
require.NoError(t, err)
bz := mustEncode(tc.msg)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
}

View File

@@ -6,9 +6,9 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/cmap"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p/conn"
@@ -69,17 +69,16 @@ type PeerFilterFunc func(IPeerSet, Peer) error
type Switch struct {
service.BaseService
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
msgTypeByChID map[byte]proto.Message
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
// peers addresses with whom we'll maintain constant connection
persistentPeersAddrs []*NetAddress
unconditionalPeerIDs map[ID]struct{}
@@ -114,7 +113,6 @@ func NewSwitch(
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
msgTypeByChID: make(map[byte]proto.Message),
peers: NewPeerSet(),
dialing: cmap.NewCMap(),
reconnecting: cmap.NewCMap(),
@@ -166,7 +164,6 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
}
sw.chDescs = append(sw.chDescs, chDesc)
sw.reactorsByCh[chID] = reactor
sw.msgTypeByChID[chID] = chDesc.MessageType
}
sw.reactors[name] = reactor
reactor.SetSwitch(sw)
@@ -185,7 +182,6 @@ func (sw *Switch) RemoveReactor(name string, reactor Reactor) {
}
}
delete(sw.reactorsByCh, chDesc.ID)
delete(sw.msgTypeByChID, chDesc.ID)
}
delete(sw.reactors, name)
reactor.SetSwitch(nil)
@@ -265,8 +261,8 @@ func (sw *Switch) OnStop() {
// closed once msg bytes are sent to all peers (or time out).
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) Broadcast(e Envelope) chan bool {
sw.Logger.Debug("Broadcast", "channel", e.ChannelID)
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", log.NewLazySprintf("%X", msgBytes))
peers := sw.peers.List()
var wg sync.WaitGroup
@@ -276,7 +272,7 @@ func (sw *Switch) Broadcast(e Envelope) chan bool {
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := p.Send(e)
success := p.Send(chID, msgBytes)
successChan <- success
}(peer)
}
@@ -374,10 +370,6 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
// https://github.com/tendermint/tendermint/issues/3338
if sw.peers.Remove(peer) {
sw.metrics.Peers.Add(float64(-1))
} else {
// Removal of the peer has failed. The function above sets a flag within the peer to mark this.
// We keep this message here as information to the developer.
sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID())
}
}
@@ -627,12 +619,11 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool {
func (sw *Switch) acceptRoutine() {
for {
p, err := sw.transport.Accept(peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
})
if err != nil {
switch err := err.(type) {
@@ -731,12 +722,11 @@ func (sw *Switch) addOutboundPeerWithConfig(
}
p, err := sw.transport.Dial(*addr, peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
})
if err != nil {
if e, ok := err.(ErrRejected); ok {
@@ -834,12 +824,6 @@ func (sw *Switch) addPeer(p Peer) error {
// so that if Receive errors, we will find the peer and remove it.
// Add should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
switch err.(type) {
case ErrPeerRemoval:
sw.Logger.Error("Error starting peer ",
" err ", "Peer has already errored and removal was attempted.",
"peer", p.ID())
}
return err
}
sw.metrics.Peers.Add(float64(1))

View File

@@ -14,7 +14,6 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -24,8 +23,6 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/proto/tendermint/p2p"
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
var (
@@ -39,8 +36,9 @@ func init() {
}
type PeerMessage struct {
Contents proto.Message
Counter int
PeerID ID
Bytes []byte
Counter int
}
type TestReactor struct {
@@ -72,12 +70,12 @@ func (tr *TestReactor) AddPeer(peer Peer) {}
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
func (tr *TestReactor) Receive(e Envelope) {
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
if tr.logMessages {
tr.mtx.Lock()
defer tr.mtx.Unlock()
fmt.Printf("Received: %X, %X\n", e.ChannelID, e.Message)
tr.msgsReceived[e.ChannelID] = append(tr.msgsReceived[e.ChannelID], PeerMessage{Contents: e.Message, Counter: tr.msgsCounter})
// fmt.Printf("Received: %X, %X\n", chID, msgBytes)
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
tr.msgsCounter++
}
}
@@ -105,12 +103,12 @@ func initSwitchFunc(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x01), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
}, true))
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x03), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
}, true))
return sw
@@ -137,47 +135,31 @@ func TestSwitches(t *testing.T) {
}
// Lets send some messages
ch0Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
}
ch1Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
}
ch2Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "2",
},
},
}
s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
ch0Msg := []byte("channel zero")
ch1Msg := []byte("channel foo")
ch2Msg := []byte("channel bar")
s1.Broadcast(byte(0x00), ch0Msg)
s1.Broadcast(byte(0x01), ch1Msg)
s1.Broadcast(byte(0x02), ch2Msg)
assertMsgReceivedWithTimeout(t,
ch0Msg,
byte(0x00),
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t,
ch1Msg,
byte(0x01),
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t,
ch2Msg,
byte(0x02),
s2.Reactor("bar").(*TestReactor), 200*time.Millisecond, 5*time.Second)
s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
}
func assertMsgReceivedWithTimeout(
t *testing.T,
msg proto.Message,
msgBytes []byte,
channel byte,
reactor *TestReactor,
checkPeriod,
@@ -188,13 +170,9 @@ func assertMsgReceivedWithTimeout(
select {
case <-ticker.C:
msgs := reactor.getMsgs(channel)
expectedBytes, err := proto.Marshal(msgs[0].Contents)
require.NoError(t, err)
gotBytes, err := proto.Marshal(msg)
require.NoError(t, err)
if len(msgs) > 0 {
if !bytes.Equal(expectedBytes, gotBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msg, msgs[0].Counter)
if !bytes.Equal(msgs[0].Bytes, msgBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
}
return
}
@@ -451,10 +429,7 @@ func TestSwitchStopPeerForError(t *testing.T) {
// send messages to the peer from sw1
p := sw1.Peers().List()[0]
p.Send(Envelope{
ChannelID: 0x1,
Message: &p2p.Message{},
})
p.Send(0x1, []byte("here's a message to send"))
// stop sw2. this should cause the p to fail,
// which results in calling StopPeerForError internally
@@ -849,7 +824,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
// Send random message from foo channel to another
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
successChan := s1.Broadcast(Envelope{ChannelID: chID})
successChan := s1.Broadcast(chID, []byte("test data"))
for s := range successChan {
if s {
numSuccess++
@@ -861,16 +836,3 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
b.Logf("success: %v, failure: %v", numSuccess, numFailure)
}
func TestSwitchRemovalErr(t *testing.T) {
sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
return initSwitchFunc(i, sw)
})
assert.Equal(t, len(sw1.Peers().List()), 1)
p := sw1.Peers().List()[0]
sw2.StopPeerForError(p, fmt.Errorf("peer should error"))
assert.Equal(t, sw2.peers.Add(p).Error(), ErrPeerRemoval{}.Error())
}

View File

@@ -149,7 +149,6 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
MConnConfig(sw.config),
ni,
sw.reactorsByCh,
sw.msgTypeByChID,
sw.chDescs,
sw.StopPeerForError,
)

View File

@@ -8,7 +8,6 @@ import (
"golang.org/x/net/netutil"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/p2p/conn"
@@ -48,10 +47,9 @@ type peerConfig struct {
// isPersistent allows you to set a function, which, given socket address
// (for outbound peers) OR self-reported address (for inbound peers), tells
// if the peer is persistent or not.
isPersistent func(*NetAddress) bool
reactorsByCh map[byte]Reactor
msgTypeByChID map[byte]proto.Message
metrics *Metrics
isPersistent func(*NetAddress) bool
reactorsByCh map[byte]Reactor
metrics *Metrics
}
// Transport emits and connects to Peers. The implementation of Peer is left to
@@ -521,7 +519,6 @@ func (mt *MultiplexTransport) wrapPeer(
mt.mConfig,
ni,
cfg.reactorsByCh,
cfg.msgTypeByChID,
cfg.chDescs,
cfg.onPeerError,
PeerMetrics(cfg.metrics),

View File

@@ -1,40 +1,8 @@
package p2p
import (
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
type ChannelDescriptor = conn.ChannelDescriptor
type ConnectionStatus = conn.ConnectionStatus
// Envelope contains a message with sender routing info.
type Envelope struct {
Src Peer // sender (empty if outbound)
Message proto.Message // message payload
ChannelID byte
}
// Wrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
// Router will automatically wrap outbound messages and unwrap inbound messages,
// such that reactors do not have to do this themselves.
type Unwrapper interface {
proto.Message
// Unwrap will unwrap the inner message contained in this message.
Unwrap() (proto.Message, error)
}
type Wrapper interface {
proto.Message
// Wrap will take the underlying message and wrap it in its wrapper type.
Wrap() (proto.Message, error)
}
var (
_ Wrapper = &tmp2p.PexRequest{}
_ Wrapper = &tmp2p.PexAddrs{}
)

View File

@@ -1,73 +0,0 @@
package blocksync
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &StatusRequest{}
var _ p2p.Wrapper = &StatusResponse{}
var _ p2p.Wrapper = &NoBlockResponse{}
var _ p2p.Wrapper = &BlockResponse{}
var _ p2p.Wrapper = &BlockRequest{}
const (
BlockResponseMessagePrefixSize = 4
BlockResponseMessageFieldKeySize = 1
)
func (m *BlockRequest) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_BlockRequest{BlockRequest: m}
return bm, nil
}
func (m *BlockResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_BlockResponse{BlockResponse: m}
return bm, nil
}
func (m *NoBlockResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_NoBlockResponse{NoBlockResponse: m}
return bm, nil
}
func (m *StatusRequest) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_StatusRequest{StatusRequest: m}
return bm, nil
}
func (m *StatusResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_StatusResponse{StatusResponse: m}
return bm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_BlockRequest:
return m.GetBlockRequest(), nil
case *Message_BlockResponse:
return m.GetBlockResponse(), nil
case *Message_NoBlockResponse:
return m.GetNoBlockResponse(), nil
case *Message_StatusRequest:
return m.GetStatusRequest(), nil
case *Message_StatusResponse:
return m.GetStatusResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -1,109 +0,0 @@
package consensus
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &VoteSetBits{}
var _ p2p.Wrapper = &VoteSetMaj23{}
var _ p2p.Wrapper = &Vote{}
var _ p2p.Wrapper = &ProposalPOL{}
var _ p2p.Wrapper = &Proposal{}
var _ p2p.Wrapper = &NewValidBlock{}
var _ p2p.Wrapper = &NewRoundStep{}
var _ p2p.Wrapper = &HasVote{}
var _ p2p.Wrapper = &BlockPart{}
func (m *VoteSetBits) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_VoteSetBits{VoteSetBits: m}
return cm, nil
}
func (m *VoteSetMaj23) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_VoteSetMaj23{VoteSetMaj23: m}
return cm, nil
}
func (m *HasVote) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_HasVote{HasVote: m}
return cm, nil
}
func (m *Vote) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_Vote{Vote: m}
return cm, nil
}
func (m *BlockPart) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_BlockPart{BlockPart: m}
return cm, nil
}
func (m *ProposalPOL) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_ProposalPol{ProposalPol: m}
return cm, nil
}
func (m *Proposal) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_Proposal{Proposal: m}
return cm, nil
}
func (m *NewValidBlock) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_NewValidBlock{NewValidBlock: m}
return cm, nil
}
func (m *NewRoundStep) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_NewRoundStep{NewRoundStep: m}
return cm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped consensus
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_NewRoundStep:
return m.GetNewRoundStep(), nil
case *Message_NewValidBlock:
return m.GetNewValidBlock(), nil
case *Message_Proposal:
return m.GetProposal(), nil
case *Message_ProposalPol:
return m.GetProposalPol(), nil
case *Message_BlockPart:
return m.GetBlockPart(), nil
case *Message_Vote:
return m.GetVote(), nil
case *Message_HasVote:
return m.GetHasVote(), nil
case *Message_VoteSetMaj23:
return m.GetVoteSetMaj23(), nil
case *Message_VoteSetBits:
return m.GetVoteSetBits(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -1,30 +0,0 @@
package mempool
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &Txs{}
var _ p2p.Unwrapper = &Message{}
// Wrap implements the p2p Wrapper interface and wraps a mempool message.
func (m *Txs) Wrap() (proto.Message, error) {
return &Message{
Sum: &Message_Txs{Txs: m},
}, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped mempool
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_Txs:
return m.GetTxs(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -1,32 +0,0 @@
package p2p
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
)
func (m *PexAddrs) Wrap() (proto.Message, error) {
pm := &Message{}
pm.Sum = &Message_PexAddrs{PexAddrs: m}
return pm, nil
}
func (m *PexRequest) Wrap() (proto.Message, error) {
pm := &Message{}
pm.Sum = &Message_PexRequest{PexRequest: m}
return pm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped PEX
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_PexRequest:
return msg.PexRequest, nil
case *Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown pex message: %T", msg)
}
}

View File

@@ -1,58 +0,0 @@
package statesync
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &ChunkRequest{}
var _ p2p.Wrapper = &ChunkResponse{}
var _ p2p.Wrapper = &SnapshotsRequest{}
var _ p2p.Wrapper = &SnapshotsResponse{}
func (m *SnapshotsResponse) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_SnapshotsResponse{SnapshotsResponse: m}
return sm, nil
}
func (m *SnapshotsRequest) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_SnapshotsRequest{SnapshotsRequest: m}
return sm, nil
}
func (m *ChunkResponse) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_ChunkResponse{ChunkResponse: m}
return sm, nil
}
func (m *ChunkRequest) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_ChunkRequest{ChunkRequest: m}
return sm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped state sync
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_ChunkRequest:
return m.GetChunkRequest(), nil
case *Message_ChunkResponse:
return m.GetChunkResponse(), nil
case *Message_SnapshotsRequest:
return m.GetSnapshotsRequest(), nil
case *Message_SnapshotsResponse:
return m.GetSnapshotsResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -131,14 +131,13 @@ object nset = { # the type of node sets
object classic_bft = {
relation quorum_intersection
private {
definition [quorum_intersection_def] quorum_intersection = forall Q1,Q2. nset.is_quorum(Q1) & nset.is_quorum(Q2)
-> exists N. well_behaved(N) & nset.member(N, Q1) & nset.member(N, Q2) # every two quorums have a well-behaved node in common
definition [quorum_intersection_def] quorum_intersection = forall Q1,Q2. exists N. well_behaved(N) & nset.member(N, Q1) & nset.member(N, Q2) # every two quorums have a well-behaved node in common
}
}
trusted isolate accountable_bft = {
# this is our baseline assumption about quorums:
private {
property [max_2f_byzantine] nset.is_quorum(Q) -> exists N . well_behaved(N) & nset.member(N,Q) # every quorum has a well-behaved member
property [max_2f_byzantine] exists N . well_behaved(N) & nset.member(N,Q) # every quorum has a well-behaved member
}
}

View File

@@ -264,7 +264,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
pruned, err := blockExec.pruneBlocks(retainHeight, state)
pruned, err := blockExec.pruneBlocks(retainHeight)
if err != nil {
blockExec.logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
} else {
@@ -642,20 +642,19 @@ func ExecCommitBlock(
return res.Data, nil
}
func (blockExec *BlockExecutor) pruneBlocks(retainHeight int64, state State) (uint64, error) {
func (blockExec *BlockExecutor) pruneBlocks(retainHeight int64) (uint64, error) {
base := blockExec.blockStore.Base()
if retainHeight <= base {
return 0, nil
}
amountPruned, prunedHeaderHeight, err := blockExec.blockStore.PruneBlocks(retainHeight, state)
pruned, err := blockExec.blockStore.PruneBlocks(retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune block store: %w", err)
}
err = blockExec.Store().PruneStates(base, retainHeight, prunedHeaderHeight)
err = blockExec.Store().PruneStates(base, retainHeight)
if err != nil {
return 0, fmt.Errorf("failed to prune state store: %w", err)
}
return amountPruned, nil
return pruned, nil
}

View File

@@ -19,7 +19,6 @@ import (
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
// Register the Postgres database driver.
@@ -197,55 +196,6 @@ func TestIndexing(t *testing.T) {
err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
})
t.Run("IndexerService", func(t *testing.T) {
indexer := &EventSink{store: testDB(), chainID: chainID}
// event bus
eventBus := types.NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := eventBus.Stop(); err != nil {
t.Error(err)
}
})
service := txindex.NewIndexerService(indexer.TxIndexer(), indexer.BlockIndexer(), eventBus, true)
err = service.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := service.Stop(); err != nil {
t.Error(err)
}
})
// publish block with txs
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
NumTxs: int64(2),
})
require.NoError(t, err)
txResult1 := &abci.TxResult{
Height: 1,
Index: uint32(0),
Tx: types.Tx("foo"),
Result: abci.ResponseDeliverTx{Code: 0},
}
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1})
require.NoError(t, err)
txResult2 := &abci.TxResult{
Height: 1,
Index: uint32(1),
Tx: types.Tx("bar"),
Result: abci.ResponseDeliverTx{Code: 1},
}
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
require.True(t, service.IsRunning())
})
}
func TestStop(t *testing.T) {

View File

@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/state"
types "github.com/tendermint/tendermint/types"
)
@@ -184,32 +183,25 @@ func (_m *BlockStore) LoadSeenCommit(height int64) *types.Commit {
return r0
}
// PruneBlocks provides a mock function with given fields: height, _a1
func (_m *BlockStore) PruneBlocks(height int64, _a1 state.State) (uint64, int64, error) {
ret := _m.Called(height, _a1)
// PruneBlocks provides a mock function with given fields: height
func (_m *BlockStore) PruneBlocks(height int64) (uint64, error) {
ret := _m.Called(height)
var r0 uint64
if rf, ok := ret.Get(0).(func(int64, state.State) uint64); ok {
r0 = rf(height, _a1)
if rf, ok := ret.Get(0).(func(int64) uint64); ok {
r0 = rf(height)
} else {
r0 = ret.Get(0).(uint64)
}
var r1 int64
if rf, ok := ret.Get(1).(func(int64, state.State) int64); ok {
r1 = rf(height, _a1)
var r1 error
if rf, ok := ret.Get(1).(func(int64) error); ok {
r1 = rf(height)
} else {
r1 = ret.Get(1).(int64)
r1 = ret.Error(1)
}
var r2 error
if rf, ok := ret.Get(2).(func(int64, state.State) error); ok {
r2 = rf(height, _a1)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
return r0, r1
}
// SaveBlock provides a mock function with given fields: block, blockParts, seenCommit

View File

@@ -197,13 +197,13 @@ func (_m *Store) LoadValidators(_a0 int64) (*types.ValidatorSet, error) {
return r0, r1
}
// PruneStates provides a mock function with given fields: _a0, _a1, _a2
func (_m *Store) PruneStates(_a0 int64, _a1 int64, _a2 int64) error {
ret := _m.Called(_a0, _a1, _a2)
// PruneStates provides a mock function with given fields: _a0, _a1
func (_m *Store) PruneStates(_a0 int64, _a1 int64) error {
ret := _m.Called(_a0, _a1)
var r0 error
if rf, ok := ret.Get(0).(func(int64, int64, int64) error); ok {
r0 = rf(_a0, _a1, _a2)
if rf, ok := ret.Get(0).(func(int64, int64) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}

View File

@@ -26,7 +26,7 @@ type BlockStore interface {
SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
PruneBlocks(height int64, state State) (uint64, int64, error)
PruneBlocks(height int64) (uint64, error)
LoadBlockByHash(hash []byte) *types.Block
LoadBlockMetaByHash(hash []byte) *types.BlockMeta

View File

@@ -68,10 +68,10 @@ type Store interface {
Save(State) error
// SaveABCIResponses saves ABCIResponses for a given height
SaveABCIResponses(int64, *tmstate.ABCIResponses) error
// Bootstrap is used for bootstrapping state when not starting from a initial height
// Bootstrap is used for bootstrapping state when not starting from a initial height.
Bootstrap(State) error
// PruneStates takes the height from which to start pruning and which height stop at
PruneStates(int64, int64, int64) error
// PruneStates takes the height from which to start prning and which height stop at
PruneStates(int64, int64) error
// Close closes the connection with the database
Close() error
}
@@ -237,15 +237,14 @@ func (store dbStore) Bootstrap(state State) error {
// encoding not preserving ordering: https://github.com/tendermint/tendermint/issues/4567
// This will cause some old states to be left behind when doing incremental partial prunes,
// specifically older checkpoints and LastHeightChanged targets.
func (store dbStore) PruneStates(from int64, to int64, evidenceThresholdHeight int64) error {
func (store dbStore) PruneStates(from int64, to int64) error {
if from <= 0 || to <= 0 {
return fmt.Errorf("from height %v and to height %v must be greater than 0", from, to)
}
if from >= to {
return fmt.Errorf("from height %v must be lower than to height %v", from, to)
}
valInfo, err := loadValidatorsInfo(store.db, min(to, evidenceThresholdHeight))
valInfo, err := loadValidatorsInfo(store.db, to)
if err != nil {
return fmt.Errorf("validators at height %v not found: %w", to, err)
}
@@ -299,14 +298,12 @@ func (store dbStore) PruneStates(from int64, to int64, evidenceThresholdHeight i
return err
}
}
} else if h < evidenceThresholdHeight {
} else {
err = batch.Delete(calcValidatorsKey(h))
if err != nil {
return err
}
}
// else we keep the validator set because we might need
// it later on for evidence verification
if keepParams[h] {
p, err := store.loadConsensusParamsInfo(h)
@@ -664,10 +661,3 @@ func (store dbStore) saveConsensusParamsInfo(nextHeight, changeHeight int64, par
func (store dbStore) Close() error {
return store.db.Close()
}
func min(a int64, b int64) int64 {
if a < b {
return a
}
return b
}

View File

@@ -88,25 +88,23 @@ func BenchmarkLoadValidators(b *testing.B) {
func TestPruneStates(t *testing.T) {
testcases := map[string]struct {
makeHeights int64
pruneFrom int64
pruneTo int64
evidenceThresholdHeight int64
expectErr bool
expectVals []int64
expectParams []int64
expectABCI []int64
makeHeights int64
pruneFrom int64
pruneTo int64
expectErr bool
expectVals []int64
expectParams []int64
expectABCI []int64
}{
"error on pruning from 0": {100, 0, 5, 100, true, nil, nil, nil},
"error when from > to": {100, 3, 2, 2, true, nil, nil, nil},
"error when from == to": {100, 3, 3, 3, true, nil, nil, nil},
"error when to does not exist": {100, 1, 101, 101, true, nil, nil, nil},
"prune all": {100, 1, 100, 100, false, []int64{93, 100}, []int64{95, 100}, []int64{100}},
"prune some": {10, 2, 8, 8, false, []int64{1, 3, 8, 9, 10},
"error on pruning from 0": {100, 0, 5, true, nil, nil, nil},
"error when from > to": {100, 3, 2, true, nil, nil, nil},
"error when from == to": {100, 3, 3, true, nil, nil, nil},
"error when to does not exist": {100, 1, 101, true, nil, nil, nil},
"prune all": {100, 1, 100, false, []int64{93, 100}, []int64{95, 100}, []int64{100}},
"prune some": {10, 2, 8, false, []int64{1, 3, 8, 9, 10},
[]int64{1, 5, 8, 9, 10}, []int64{1, 8, 9, 10}},
"prune across checkpoint": {100001, 1, 100001, 100001, false, []int64{99993, 100000, 100001},
"prune across checkpoint": {100001, 1, 100001, false, []int64{99993, 100000, 100001},
[]int64{99995, 100001}, []int64{100001}},
"prune when evidence height < height": {20, 1, 18, 17, false, []int64{13, 17, 18, 19, 20}, []int64{15, 18, 19, 20}, []int64{18, 19, 20}},
}
for name, tc := range testcases {
tc := tc
@@ -165,7 +163,7 @@ func TestPruneStates(t *testing.T) {
}
// Test assertions
err := stateStore.PruneStates(tc.pruneFrom, tc.pruneTo, tc.evidenceThresholdHeight)
err := stateStore.PruneStates(tc.pruneFrom, tc.pruneTo)
if tc.expectErr {
require.Error(t, err)
return

View File

@@ -3,6 +3,7 @@ package txindex
import (
"context"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
@@ -19,10 +20,9 @@ const (
type IndexerService struct {
service.BaseService
txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
terminateOnError bool
txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
}
// NewIndexerService returns a new service instance.
@@ -30,10 +30,9 @@ func NewIndexerService(
txIdxr TxIndexer,
blockIdxr indexer.BlockIndexer,
eventBus *types.EventBus,
terminateOnError bool,
) *IndexerService {
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus, terminateOnError: terminateOnError}
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
}
@@ -75,38 +74,24 @@ func (is *IndexerService) OnStart() error {
"index", txResult.Index,
"err", err,
)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
}
}
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Info("indexed block exents", "height", height)
is.Logger.Info("indexed block", "height", height)
}
batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr)
if err != nil {
is.Logger.Error("deduplicate batch", "height", height)
}
if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs)
is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs)
}
}
}()
@@ -119,3 +104,45 @@ func (is *IndexerService) OnStop() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}
}
// DeduplicateBatch consider the case of duplicate txs.
// if the current one under investigation is NOT OK, then we need to check
// whether there's a previously indexed tx.
// SKIP the current tx if the previously indexed record is found and successful.
func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) {
result := make([]*abci.TxResult, 0, len(ops))
// keep track of successful txs in this block in order to suppress latter ones being indexed.
var successfulTxsInThisBlock = make(map[string]struct{})
for _, txResult := range ops {
hash := types.Tx(txResult.Tx).Hash()
if txResult.Result.IsOK() {
successfulTxsInThisBlock[string(hash)] = struct{}{}
} else {
// if it already appeared in current block and was successful, skip.
if _, found := successfulTxsInThisBlock[string(hash)]; found {
continue
}
// check if this tx hash is already indexed
old, err := txIdxr.Get(hash)
// if db op errored
// Not found is not an error
if err != nil {
return nil, err
}
// if it's already indexed in an older block and was successful, skip.
if old != nil && old.Result.Code == abci.CodeTypeOK {
continue
}
}
result = append(result, txResult)
}
return result, nil
}

View File

@@ -32,7 +32,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
txIndexer := kv.NewTxIndex(store)
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
service.SetLogger(log.TestingLogger())
err = service.Start()
require.NoError(t, err)
@@ -79,3 +79,164 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, txResult2, res)
}
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")
testCases := []struct {
name string
tx1 abci.TxResult
tx2 abci.TxResult
expSkip bool // do we expect the second tx to be skipped by tx indexer
}{
{"skip, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"not skip, both successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
{"not skip, both unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"skip, same block, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, same block, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := kv.NewTxIndex(db.NewMemDB())
if tc.tx1.Height != tc.tx2.Height {
// index the first tx
err := indexer.AddBatch(&txindex.Batch{
Ops: []*abci.TxResult{&tc.tx1},
})
require.NoError(t, err)
// check if the second one should be skipped.
ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer)
require.NoError(t, err)
if tc.expSkip {
require.Empty(t, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
}
} else {
// same block
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
ops, err := txindex.DeduplicateBatch(ops, indexer)
require.NoError(t, err)
if tc.expSkip {
// the second one is skipped
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
}
}
})
}
}

View File

@@ -102,30 +102,12 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
// that indexed from the tx's events is a composite of the event type and the
// respective attribute's key delimited by a "." (eg. "account.number").
// Any event with an empty type is not indexed.
//
// If a transaction is indexed with the same hash as a previous transaction, it will
// be overwritten unless the tx result was NOT OK and the prior result was OK i.e.
// more transactions that successfully executed overwrite transactions that failed
// or successful yet older transactions.
func (txi *TxIndex) Index(result *abci.TxResult) error {
b := txi.store.NewBatch()
defer b.Close()
hash := types.Tx(result.Tx).Hash()
if !result.Result.IsOK() {
oldResult, err := txi.Get(hash)
if err != nil {
return err
}
// if the new transaction failed and it's already indexed in an older block and was successful
// we skip it as we want users to get the older successful transaction when they query.
if oldResult != nil && oldResult.Result.Code == abci.CodeTypeOK {
return nil
}
}
// index tx by events
err := txi.indexEvents(result, hash, b)
if err != nil {

View File

@@ -258,103 +258,6 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
}
}
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")
testCases := []struct {
name string
tx1 *abci.TxResult
tx2 *abci.TxResult
expOverwrite bool // do we expect the second tx to overwrite the first tx
}{
{
"don't overwrite as a non-zero code was returned and the previous tx was successful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{
"overwrite as the previous tx was also unsuccessful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{
"overwrite as the most recent tx was successful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
true,
},
}
hash := mockTx.Hash()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB())
// index the first tx
err := indexer.Index(tc.tx1)
require.NoError(t, err)
// index the same tx with different results
err = indexer.Index(tc.tx2)
require.NoError(t, err)
res, err := indexer.Get(hash)
require.NoError(t, err)
if tc.expOverwrite {
require.Equal(t, tc.tx2, res)
} else {
require.Equal(t, tc.tx1, res)
}
})
}
}
func TestTxSearchMultipleTxs(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB())

View File

@@ -16,6 +16,49 @@ const (
chunkMsgSize = int(16e6)
)
// mustEncodeMsg encodes a Protobuf message, panicing on error.
func mustEncodeMsg(pb proto.Message) []byte {
msg := ssproto.Message{}
switch pb := pb.(type) {
case *ssproto.ChunkRequest:
msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb}
case *ssproto.ChunkResponse:
msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb}
case *ssproto.SnapshotsRequest:
msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb}
case *ssproto.SnapshotsResponse:
msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb}
default:
panic(fmt.Errorf("unknown message type %T", pb))
}
bz, err := msg.Marshal()
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
// decodeMsg decodes a Protobuf message.
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &ssproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *ssproto.Message_ChunkRequest:
return msg.ChunkRequest, nil
case *ssproto.Message_ChunkResponse:
return msg.ChunkResponse, nil
case *ssproto.Message_SnapshotsRequest:
return msg.SnapshotsRequest, nil
case *ssproto.Message_SnapshotsResponse:
return msg.SnapshotsResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// validateMsg validates a message.
func validateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -7,7 +7,6 @@ import (
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/p2p"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)
@@ -100,10 +99,8 @@ func TestStateSyncVectors(t *testing.T) {
for _, tc := range testCases {
tc := tc
w, err := tc.msg.(p2p.Wrapper).Wrap()
require.NoError(t, err)
bz, err := proto.Marshal(w)
require.NoError(t, err)
bz := mustEncodeMsg(tc.msg)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
}

View File

@@ -66,14 +66,12 @@ func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
MessageType: &ssproto.Message{},
},
{
ID: ChunkChannel,
Priority: 3,
SendQueueCapacity: 10,
RecvMessageCapacity: chunkMsgSize,
MessageType: &ssproto.Message{},
},
}
}
@@ -102,21 +100,27 @@ func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
}
// Receive implements p2p.Reactor.
func (r *Reactor) Receive(e p2p.Envelope) {
func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
if !r.IsRunning() {
return
}
err := validateMsg(e.Message)
msg, err := decodeMsg(msgBytes)
if err != nil {
r.Logger.Error("Invalid message", "peer", e.Src, "msg", e.Message, "err", err)
r.Switch.StopPeerForError(e.Src, err)
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
err = validateMsg(msg)
if err != nil {
r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
switch e.ChannelID {
switch chID {
case SnapshotChannel:
switch msg := e.Message.(type) {
switch msg := msg.(type) {
case *ssproto.SnapshotsRequest:
snapshots, err := r.recentSnapshots(recentSnapshots)
if err != nil {
@@ -125,17 +129,14 @@ func (r *Reactor) Receive(e p2p.Envelope) {
}
for _, snapshot := range snapshots {
r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
"format", snapshot.Format, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
ChannelID: e.ChannelID,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
},
})
"format", snapshot.Format, "peer", src.ID())
src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
}))
}
case *ssproto.SnapshotsResponse:
@@ -145,8 +146,8 @@ func (r *Reactor) Receive(e p2p.Envelope) {
r.Logger.Debug("Received unexpected snapshot, no state sync in progress")
return
}
r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", e.Src.ID())
_, err := r.syncer.AddSnapshot(e.Src, &snapshot{
r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID())
_, err := r.syncer.AddSnapshot(src, &snapshot{
Height: msg.Height,
Format: msg.Format,
Chunks: msg.Chunks,
@@ -156,7 +157,7 @@ func (r *Reactor) Receive(e p2p.Envelope) {
// TODO: We may want to consider punishing the peer for certain errors
if err != nil {
r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format,
"peer", e.Src.ID(), "err", err)
"peer", src.ID(), "err", err)
return
}
@@ -165,10 +166,10 @@ func (r *Reactor) Receive(e p2p.Envelope) {
}
case ChunkChannel:
switch msg := e.Message.(type) {
switch msg := msg.(type) {
case *ssproto.ChunkRequest:
r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", e.Src.ID())
"chunk", msg.Index, "peer", src.ID())
resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{
Height: msg.Height,
Format: msg.Format,
@@ -180,33 +181,30 @@ func (r *Reactor) Receive(e p2p.Envelope) {
return
}
r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
},
})
"chunk", msg.Index, "peer", src.ID())
src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
}))
case *ssproto.ChunkResponse:
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer == nil {
r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", e.Src.ID())
r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID())
return
}
r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", e.Src.ID())
"chunk", msg.Index, "peer", src.ID())
_, err := r.syncer.AddChunk(&chunk{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: msg.Chunk,
Sender: e.Src.ID(),
Sender: src.ID(),
})
if err != nil {
r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format,
@@ -219,7 +217,7 @@ func (r *Reactor) Receive(e p2p.Envelope) {
}
default:
r.Logger.Error("Received message on invalid channel %x", e.ChannelID)
r.Logger.Error("Received message on invalid channel %x", chID)
}
}
@@ -271,11 +269,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
hook := func() {
r.Logger.Debug("Requesting snapshots from known peers")
// Request snapshots from all currently connected peers
r.Switch.NewBroadcast(p2p.Envelope{
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
})
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
}
hook()

View File

@@ -4,7 +4,6 @@ import (
"testing"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@@ -54,18 +53,10 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
peer.On("ID").Return(p2p.ID("id"))
var response *ssproto.ChunkResponse
if tc.expectResponse != nil {
peer.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == ChunkChannel
})).Run(func(args mock.Arguments) {
e := args[0].(p2p.Envelope)
// Marshal to simulate a wire roundtrip.
bz, err := proto.Marshal(e.Message)
peer.On("Send", ChunkChannel, mock.Anything).Run(func(args mock.Arguments) {
msg, err := decodeMsg(args[1].([]byte))
require.NoError(t, err)
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
response = e.Message.(*ssproto.ChunkResponse)
response = msg.(*ssproto.ChunkResponse)
}).Return(true)
}
@@ -80,11 +71,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
}
})
r.Receive(p2p.Envelope{
ChannelID: ChunkChannel,
Src: peer,
Message: tc.request,
})
r.Receive(ChunkChannel, peer, mustEncodeMsg(tc.request))
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tc.expectResponse, response)
@@ -144,18 +131,10 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
peer := &p2pmocks.Peer{}
if len(tc.expectResponses) > 0 {
peer.On("ID").Return(p2p.ID("id"))
peer.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == SnapshotChannel
})).Run(func(args mock.Arguments) {
e := args[0].(p2p.Envelope)
// Marshal to simulate a wire roundtrip.
bz, err := proto.Marshal(e.Message)
peer.On("Send", SnapshotChannel, mock.Anything).Run(func(args mock.Arguments) {
msg, err := decodeMsg(args[1].([]byte))
require.NoError(t, err)
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
responses = append(responses, e.Message.(*ssproto.SnapshotsResponse))
responses = append(responses, msg.(*ssproto.SnapshotsResponse))
}).Return(true)
}
@@ -170,11 +149,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
}
})
r.Receive(p2p.Envelope{
ChannelID: SnapshotChannel,
Src: peer,
Message: &ssproto.SnapshotsRequest{},
})
r.Receive(SnapshotChannel, peer, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tc.expectResponses, responses)

View File

@@ -126,11 +126,7 @@ func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) {
// to discover snapshots, later we may want to do retries and stuff.
func (s *syncer) AddPeer(peer p2p.Peer) {
s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
e := p2p.Envelope{
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
}
peer.Send(e)
peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
}
// RemovePeer removes a peer from the pool.
@@ -471,14 +467,11 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
}
s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
peer.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
},
})
peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
}))
}
// verifyApp verifies the sync, checking the app hash, last block height and app version

View File

@@ -98,27 +98,13 @@ func TestSyncer_SyncAny(t *testing.T) {
// Adding a couple of peers should trigger snapshot discovery messages
peerA := &p2pmocks.Peer{}
peerA.On("ID").Return(p2p.ID("a"))
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
if !ok {
return false
}
req, ok := e.Message.(*ssproto.SnapshotsRequest)
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
syncer.AddPeer(peerA)
peerA.AssertExpectations(t)
peerB := &p2pmocks.Peer{}
peerB.On("ID").Return(p2p.ID("b"))
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
if !ok {
return false
}
req, ok := e.Message.(*ssproto.SnapshotsRequest)
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
syncer.AddPeer(peerB)
peerB.AssertExpectations(t)
@@ -161,9 +147,9 @@ func TestSyncer_SyncAny(t *testing.T) {
chunkRequests := make(map[uint32]int)
chunkRequestsMtx := tmsync.Mutex{}
onChunkRequest := func(args mock.Arguments) {
e, ok := args[0].(p2p.Envelope)
require.True(t, ok)
msg := e.Message.(*ssproto.ChunkRequest)
pb, err := decodeMsg(args[1].([]byte))
require.NoError(t, err)
msg := pb.(*ssproto.ChunkRequest)
require.EqualValues(t, 1, msg.Height)
require.EqualValues(t, 1, msg.Format)
require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
@@ -176,14 +162,8 @@ func TestSyncer_SyncAny(t *testing.T) {
chunkRequests[msg.Index]++
chunkRequestsMtx.Unlock()
}
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == ChunkChannel
})).Maybe().Run(onChunkRequest).Return(true)
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == ChunkChannel
})).Maybe().Run(onChunkRequest).Return(true)
peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from

View File

@@ -7,11 +7,9 @@ import (
"github.com/cosmos/gogoproto/proto"
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/evidence"
tmsync "github.com/tendermint/tendermint/libs/sync"
tmstore "github.com/tendermint/tendermint/proto/tendermint/store"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
@@ -266,20 +264,20 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit {
return commit
}
// PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned and the evidence retain height - the height at which data needed to prove evidence must not be removed.
func (bs *BlockStore) PruneBlocks(height int64, state sm.State) (uint64, int64, error) {
// PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned.
func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) {
if height <= 0 {
return 0, -1, fmt.Errorf("height must be greater than 0")
return 0, fmt.Errorf("height must be greater than 0")
}
bs.mtx.RLock()
if height > bs.height {
bs.mtx.RUnlock()
return 0, -1, fmt.Errorf("cannot prune beyond the latest height %v", bs.height)
return 0, fmt.Errorf("cannot prune beyond the latest height %v", bs.height)
}
base := bs.base
bs.mtx.RUnlock()
if height < base {
return 0, -1, fmt.Errorf("cannot prune to height %v, it is lower than base height %v",
return 0, fmt.Errorf("cannot prune to height %v, it is lower than base height %v",
height, base)
}
@@ -302,42 +300,26 @@ func (bs *BlockStore) PruneBlocks(height int64, state sm.State) (uint64, int64,
return nil
}
evidencePoint := height
for h := base; h < height; h++ {
meta := bs.LoadBlockMeta(h)
if meta == nil { // assume already deleted
continue
}
// This logic is in place to protect data that proves malicious behavior.
// If the height is within the evidence age, we continue to persist the header and commit data.
if evidencePoint == height && !evidence.IsEvidenceExpired(state.LastBlockHeight, state.LastBlockTime, h, meta.Header.Time, state.ConsensusParams.Evidence) {
evidencePoint = h
}
// if height is beyond the evidence point we dont delete the header
if h < evidencePoint {
if err := batch.Delete(calcBlockMetaKey(h)); err != nil {
return 0, -1, err
}
if err := batch.Delete(calcBlockMetaKey(h)); err != nil {
return 0, err
}
if err := batch.Delete(calcBlockHashKey(meta.BlockID.Hash)); err != nil {
return 0, -1, err
return 0, err
}
// if height is beyond the evidence point we dont delete the commit data
if h < evidencePoint {
if err := batch.Delete(calcBlockCommitKey(h)); err != nil {
return 0, -1, err
}
if err := batch.Delete(calcBlockCommitKey(h)); err != nil {
return 0, err
}
if err := batch.Delete(calcSeenCommitKey(h)); err != nil {
return 0, -1, err
return 0, err
}
for p := 0; p < int(meta.BlockID.PartSetHeader.Total); p++ {
if err := batch.Delete(calcBlockPartKey(h, p)); err != nil {
return 0, -1, err
return 0, err
}
}
pruned++
@@ -346,7 +328,7 @@ func (bs *BlockStore) PruneBlocks(height int64, state sm.State) (uint64, int64,
if pruned%1000 == 0 && pruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, -1, err
return 0, err
}
batch = bs.db.NewBatch()
defer batch.Close()
@@ -355,9 +337,9 @@ func (bs *BlockStore) PruneBlocks(height int64, state sm.State) (uint64, int64,
err := flush(batch, height)
if err != nil {
return 0, -1, err
return 0, err
}
return pruned, evidencePoint, nil
return pruned, nil
}
// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db.

View File

@@ -381,7 +381,7 @@ func TestLoadBaseMeta(t *testing.T) {
bs.SaveBlock(block, partSet, seenCommit)
}
_, _, err = bs.PruneBlocks(4, state)
_, err = bs.PruneBlocks(4)
require.NoError(t, err)
baseBlock := bs.LoadBaseMeta()
@@ -440,10 +440,10 @@ func TestPruneBlocks(t *testing.T) {
assert.EqualValues(t, 0, bs.Size())
// pruning an empty store should error, even when pruning to 0
_, _, err = bs.PruneBlocks(1, state)
_, err = bs.PruneBlocks(1)
require.Error(t, err)
_, _, err = bs.PruneBlocks(0, state)
_, err = bs.PruneBlocks(0)
require.Error(t, err)
// make more than 1000 blocks, to test batch deletions
@@ -459,30 +459,27 @@ func TestPruneBlocks(t *testing.T) {
assert.EqualValues(t, 1500, bs.Height())
assert.EqualValues(t, 1500, bs.Size())
state.LastBlockTime = time.Date(2020, 1, 1, 1, 0, 0, 0, time.UTC)
state.LastBlockHeight = 1500
state.ConsensusParams.Evidence.MaxAgeNumBlocks = 400
state.ConsensusParams.Evidence.MaxAgeDuration = 1 * time.Second
prunedBlock := bs.LoadBlock(1199)
// Check that basic pruning works
pruned, evidenceRetainHeight, err := bs.PruneBlocks(1200, state)
pruned, err := bs.PruneBlocks(1200)
require.NoError(t, err)
assert.EqualValues(t, 1199, pruned)
assert.EqualValues(t, 1200, bs.Base())
assert.EqualValues(t, 1500, bs.Height())
assert.EqualValues(t, 301, bs.Size())
assert.EqualValues(t, 1100, evidenceRetainHeight)
assert.EqualValues(t, tmstore.BlockStoreState{
Base: 1200,
Height: 1500,
}, LoadBlockStoreState(db))
require.NotNil(t, bs.LoadBlock(1200))
require.Nil(t, bs.LoadBlock(1199))
// The header and commit for heights 1100 onwards
// need to remain to verify evidence
require.NotNil(t, bs.LoadBlockMeta(1100))
require.Nil(t, bs.LoadBlockMeta(1099))
require.NotNil(t, bs.LoadBlockCommit(1100))
require.Nil(t, bs.LoadBlockCommit(1099))
require.Nil(t, bs.LoadBlockByHash(prunedBlock.Hash()))
require.Nil(t, bs.LoadBlockCommit(1199))
require.Nil(t, bs.LoadBlockMeta(1199))
require.Nil(t, bs.LoadBlockMetaByHash(prunedBlock.Hash()))
require.Nil(t, bs.LoadBlockPart(1199, 1))
for i := int64(1); i < 1200; i++ {
require.Nil(t, bs.LoadBlock(i))
@@ -492,33 +489,26 @@ func TestPruneBlocks(t *testing.T) {
}
// Pruning below the current base should error
_, _, err = bs.PruneBlocks(1199, state)
_, err = bs.PruneBlocks(1199)
require.Error(t, err)
// Pruning to the current base should work
pruned, _, err = bs.PruneBlocks(1200, state)
pruned, err = bs.PruneBlocks(1200)
require.NoError(t, err)
assert.EqualValues(t, 0, pruned)
// Pruning again should work
pruned, _, err = bs.PruneBlocks(1300, state)
pruned, err = bs.PruneBlocks(1300)
require.NoError(t, err)
assert.EqualValues(t, 100, pruned)
assert.EqualValues(t, 1300, bs.Base())
// we should still have the header and the commit
// as they're needed for evidence
require.NotNil(t, bs.LoadBlockMeta(1100))
require.Nil(t, bs.LoadBlockMeta(1099))
require.NotNil(t, bs.LoadBlockCommit(1100))
require.Nil(t, bs.LoadBlockCommit(1099))
// Pruning beyond the current height should error
_, _, err = bs.PruneBlocks(1501, state)
_, err = bs.PruneBlocks(1501)
require.Error(t, err)
// Pruning to the current height should work
pruned, _, err = bs.PruneBlocks(1500, state)
pruned, err = bs.PruneBlocks(1500)
require.NoError(t, err)
assert.EqualValues(t, 200, pruned)
assert.Nil(t, bs.LoadBlock(1499))

View File

@@ -87,7 +87,7 @@ func toCSVRecords(rs []report.Report) [][]string {
}
res := make([][]string, total+1)
res[0] = []string{"experiment_id", "block_time", "duration_ns", "tx_hash", "connections", "rate", "size"}
res[0] = []string{"experiment_id", "duration_ns", "block_time", "connections", "rate", "size"}
offset := 1
for _, r := range rs {
idStr := r.ID.String()
@@ -95,7 +95,7 @@ func toCSVRecords(rs []report.Report) [][]string {
rateStr := strconv.FormatInt(int64(r.Rate), 10)
sizeStr := strconv.FormatInt(int64(r.Size), 10)
for i, v := range r.All {
res[offset+i] = []string{idStr, strconv.FormatInt(v.BlockTime.UnixNano(), 10), strconv.FormatInt(int64(v.Duration), 10), fmt.Sprintf("%X", v.Hash), connStr, rateStr, sizeStr}
res[offset+i] = []string{idStr, strconv.FormatInt(int64(v.Duration), 10), strconv.FormatInt(v.BlockTime.UnixNano(), 10), connStr, rateStr, sizeStr}
}
offset += len(r.All)
}

View File

@@ -3,7 +3,7 @@ package payload
import (
"bytes"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"math"
@@ -12,7 +12,6 @@ import (
)
const keyPrefix = "a="
const maxPayloadSize = 4 * 1024 * 1024
// NewBytes generates a new payload and returns the encoded representation of
// the payload as a slice of bytes. NewBytes uses the fields on the Options
@@ -26,16 +25,10 @@ func NewBytes(p *Payload) ([]byte, error) {
if err != nil {
return nil, err
}
if p.Size > maxPayloadSize {
return nil, fmt.Errorf("configured size %d is too large (>%d)", p.Size, maxPayloadSize)
if p.Size < uint64(us) {
return nil, fmt.Errorf("configured size %d not large enough to fit unpadded transaction of size %d", p.Size, us)
}
pSize := int(p.Size) // #nosec -- The "if" above makes this cast safe
if pSize < us {
return nil, fmt.Errorf("configured size %d not large enough to fit unpadded transaction of size %d", pSize, us)
}
// We halve the padding size because we transform the TX to hex
p.Padding = make([]byte, (pSize-us)/2)
p.Padding = make([]byte, p.Size-uint64(us))
_, err = rand.Read(p.Padding)
if err != nil {
return nil, err
@@ -44,28 +37,22 @@ func NewBytes(p *Payload) ([]byte, error) {
if err != nil {
return nil, err
}
h := []byte(hex.EncodeToString(b))
// prepend a single key so that the kv store only ever stores a single
// transaction instead of storing all tx and ballooning in size.
return append([]byte(keyPrefix), h...), nil
return append([]byte(keyPrefix), b...), nil
}
// FromBytes extracts a paylod from the byte representation of the payload.
// FromBytes leaves the padding untouched, returning it to the caller to handle
// or discard per their preference.
func FromBytes(b []byte) (*Payload, error) {
trH := bytes.TrimPrefix(b, []byte(keyPrefix))
if bytes.Equal(b, trH) {
return nil, fmt.Errorf("payload bytes missing key prefix '%s'", keyPrefix)
}
trB, err := hex.DecodeString(string(trH))
if err != nil {
return nil, err
}
p := &Payload{}
err = proto.Unmarshal(trB, p)
tr := bytes.TrimPrefix(b, []byte(keyPrefix))
if bytes.Equal(b, tr) {
return nil, errors.New("payload bytes missing key prefix")
}
err := proto.Unmarshal(tr, p)
if err != nil {
return nil, err
}
@@ -96,6 +83,5 @@ func CalculateUnpaddedSize(p *Payload) (int, error) {
if err != nil {
return 0, err
}
h := []byte(hex.EncodeToString(b))
return len(h) + len(keyPrefix), nil
return len(b) + len(keyPrefix), nil
}

View File

@@ -25,7 +25,6 @@ type BlockStore interface {
type DataPoint struct {
Duration time.Duration
BlockTime time.Time
Hash []byte
}
// Report contains the data calculated from reading the timestamped transactions
@@ -69,7 +68,7 @@ func (rs *Reports) ErrorCount() int {
return rs.errorCount
}
func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, hash []byte, conns, rate, size uint64) {
func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, conns, rate, size uint64) {
r, ok := rs.s[id]
if !ok {
r = Report{
@@ -82,7 +81,7 @@ func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, has
}
rs.s[id] = r
}
r.All = append(r.All, DataPoint{Duration: l, BlockTime: bt, Hash: hash})
r.All = append(r.All, DataPoint{Duration: l, BlockTime: bt})
if l > r.Max {
r.Max = l
}
@@ -124,12 +123,11 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
id uuid.UUID
l time.Duration
bt time.Time
hash []byte
connections, rate, size uint64
err error
}
type txData struct {
tx types.Tx
tx []byte
bt time.Time
}
reports := &Reports{
@@ -163,7 +161,6 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
pdc <- payloadData{
l: l,
bt: b.bt,
hash: b.tx.Hash(),
id: uuid.UUID(*idb),
connections: p.Connections,
rate: p.Rate,
@@ -205,7 +202,7 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
reports.addError()
continue
}
reports.addDataPoint(pd.id, pd.l, pd.bt, pd.hash, pd.connections, pd.rate, pd.size)
reports.addDataPoint(pd.id, pd.l, pd.bt, pd.connections, pd.rate, pd.size)
}
reports.calculateAll()
return reports, nil

440
tools/valset-change/main.go Normal file
View File

@@ -0,0 +1,440 @@
package main
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
dbm "github.com/tendermint/tm-db"
tmmath "github.com/tendermint/tendermint/libs/math"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
rpc "github.com/tendermint/tendermint/rpc/client/http"
"github.com/tendermint/tendermint/types"
)
var (
threshold = tmmath.Fraction{Numerator: 2, Denominator: 3}
startHeight int64 = 1
endHeight int64 = 0 // is interpreted as the head of the chain
interval int64 = 1000
)
const dbDirName = "validators"
func main() {
if err := run(); err != nil {
fmt.Printf("ERROR: %s\n", err.Error())
fmt.Printf(`
Usage:
collect <rpcEdnpoint> [startHeight] [endHeight] [interval]
- retrieves all validator sets from a provided endpoint from startHeight to endHeight and stores them
analyze [dbPath] [threshold]
- calculates the average amount of blocks before the threshold of the validator set is met
range [dbPath]
- returns the range of validator sets stored in the database
`)
}
}
func run() error {
if len(os.Args) < 2 {
return errors.New("expected either a collect or analyze command")
}
var err error
switch os.Args[1] {
case "collect":
if len(os.Args) < 3 {
return errors.New("expected rpc endpoint to be provided")
}
rpcEndpoint := os.Args[2]
_, err := url.Parse(rpcEndpoint)
if err != nil {
return fmt.Errorf("unable to parse rpc endpint: %w", err)
}
if len(os.Args) > 3 {
startHeight, err = strconv.ParseInt(os.Args[3], 10, 64)
if err != nil {
return err
}
}
if len(os.Args) > 4 {
endHeight, err = strconv.ParseInt(os.Args[3], 10, 64)
if err != nil {
return err
}
}
return Collect(rpcEndpoint, startHeight, endHeight, interval)
case "analyze":
var dbDir string
if len(os.Args) > 2 {
dbDir = os.Args[2]
} else {
dbDir, err = os.Getwd()
if err != nil {
return err
}
}
if len(os.Args) > 3 {
threshold, err = tmmath.ParseFraction(os.Args[3])
if err != nil {
return err
}
}
return Analyze(dbDir, threshold)
case "range":
var dbDir string
if len(os.Args) > 2 {
dbDir = os.Args[2]
} else {
dbDir, err = os.Getwd()
if err != nil {
return err
}
}
return Range(dbDir)
default:
return fmt.Errorf("unknown command %s", os.Args[1])
}
}
func Collect(rpcEndpoint string, startHeight, endHeight, interval int64) error {
ctx := context.Background()
dbDir, err := os.Getwd()
if err != nil {
return err
}
db, err := dbm.NewGoLevelDB(dbDirName, dbDir)
if err != nil {
return err
}
defer db.Close()
client, err := rpc.New(rpcEndpoint, "/websocket")
if err != nil {
return err
}
var lastvs *types.ValidatorSet
height := startHeight
lastPrintedHeight := startHeight
fmt.Printf("Starting validator set collection, height: %d\n", height)
for {
subctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
vs, err := getValidators(subctx, client, height)
if err != nil {
return fmt.Errorf("fetching validators at height %d: %w", height, err)
}
// we have reached the latest height
if vs == nil {
break
}
// we skip if there were not any changes to the validator set
if lastvs != nil && validatorSetsAreEqual(lastvs, vs) {
height++
continue
}
heightKey := make([]byte, 8)
binary.LittleEndian.PutUint64(heightKey, uint64(height))
vsProto, err := vs.ToProto()
if err != nil {
return err
}
vsBytes, err := vsProto.Marshal()
if err != nil {
return err
}
if err := db.Set(heightKey, vsBytes); err != nil {
return fmt.Errorf("failed to persist at height %d: %w", height, err)
}
if height > lastPrintedHeight+(100*interval) {
fmt.Println(height)
lastPrintedHeight = height
}
height += interval
lastvs = vs
// we've reached the specified end height
if height == endHeight {
break
}
// we do this to do one final loop at the end height
if endHeight != 0 && height > endHeight {
height = endHeight
}
}
fmt.Printf("Finished at height %d\n", height)
return nil
}
func getValidators(ctx context.Context, client *rpc.HTTP, height int64) (*types.ValidatorSet, error) {
const maxPages = 100
var (
perPage = 100
vals = []*types.Validator{}
page = 1
total = -1
)
for len(vals) != total && page <= maxPages {
res, err := client.Validators(ctx, &height, &page, &perPage)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
time.Sleep(1 * time.Second)
continue
}
// if the endpoint does not have any more validator sets
// we return nil
if strings.Contains(err.Error(), "must be less than or equal to the current blockchain height") {
return nil, nil
}
return nil, err
}
// Validate response.
if len(res.Validators) == 0 {
return nil, fmt.Errorf("validator set is empty (height: %d, page: %d, per_page: %d)",
height, page, perPage)
}
if res.Total <= 0 {
return nil, fmt.Errorf("total number of vals is <= 0: %d (height: %d, page: %d, per_page: %d)",
res.Total, height, page, perPage)
}
total = res.Total
vals = append(vals, res.Validators...)
page++
}
valSet, err := types.ValidatorSetFromExistingValidators(vals)
if err != nil {
return nil, err
}
return valSet, nil
}
func Analyze(dbDir string, threshold tmmath.Fraction) error {
db, err := dbm.NewGoLevelDB(dbDirName, dbDir)
if err != nil {
return err
}
defer db.Close()
iter, err := db.Iterator(nil, nil)
if err != nil {
return err
}
defer iter.Close()
fmt.Print("Loading validators...")
// naively load everything into memory
// WARNING: check the size of the db before running
var (
vals = make(map[int64]*types.ValidatorSet)
lowestHeight int64
highestHeight int64
)
for ; iter.Valid(); iter.Next() {
height := int64(binary.LittleEndian.Uint64(iter.Key()))
if lowestHeight == 0 || height < lowestHeight {
lowestHeight = height
}
if highestHeight == 0 || height > highestHeight {
highestHeight = height
}
var vsProto = new(tmproto.ValidatorSet)
if err := vsProto.Unmarshal(iter.Value()); err != nil {
return err
}
vs, err := types.ValidatorSetFromProto(vsProto)
if err != nil {
return err
}
vals[height] = vs
}
var ranges = make([]int64, 0)
for height := lowestHeight; height < highestHeight; height++ {
vs, ok := vals[height]
if !ok {
continue
}
for comparisonHeight := height + 1; comparisonHeight <= highestHeight; comparisonHeight++ {
comparisonVS, ok := vals[comparisonHeight]
if !ok {
continue
}
if isValidatorSetChangeWithinThreshold(vs, comparisonVS, threshold) {
continue
}
// store how many heights before there was a validator set that changed more than the threshold
ranges = append(ranges, comparisonHeight-height)
break
}
fmt.Printf("processed height %d\n", height)
// the first height should find at least one range else we end early
if height == lowestHeight && len(ranges) == 0 {
fmt.Printf("validator set never varied from %d to %d\n", lowestHeight, highestHeight)
return nil
}
}
jsn, err := json.Marshal(map[string]interface{}{
"min": min(ranges),
"max": max(ranges),
"mean": mean(ranges),
"size": len(ranges),
})
if err != nil {
return err
}
fmt.Println(jsn)
return nil
}
func Range(dbDir string) error {
db, err := dbm.NewGoLevelDB(dbDirName, dbDir)
if err != nil {
return err
}
iter, err := db.Iterator(nil, nil)
if err != nil {
return err
}
defer iter.Close()
var (
lowestHeight int64
highestHeight int64
counter int = 0
)
for ; iter.Valid(); iter.Next() {
height := int64(binary.LittleEndian.Uint64(iter.Key()))
if lowestHeight == 0 || height < lowestHeight {
lowestHeight = height
}
if highestHeight == 0 || height > highestHeight {
highestHeight = height
}
counter++
}
fmt.Printf("Database contains %d validators sets ranging from %d to %d\n", counter, lowestHeight, highestHeight)
return nil
}
func validatorSetsAreEqual(v1, v2 *types.ValidatorSet) bool {
if len(v1.Validators) != len(v2.Validators) {
return false
}
sort.Sort(types.ValidatorsByVotingPower(v1.Validators))
sort.Sort(types.ValidatorsByVotingPower(v2.Validators))
for idx, val1 := range v1.Validators {
val2 := v2.Validators[idx]
if !bytes.Equal(val1.Address, val2.Address) || val1.VotingPower != val2.VotingPower {
return false
}
}
return true
}
func isValidatorSetChangeWithinThreshold(v1, v2 *types.ValidatorSet, threshold tmmath.Fraction) bool {
votingPowerNeeded := v2.TotalVotingPower() * int64(threshold.Numerator) / int64(threshold.Denominator)
tally := int64(0)
for _, val := range v1.Validators {
_, v := v2.GetByAddress(val.Address)
if v == nil {
continue
}
tally += v.VotingPower
if tally >= votingPowerNeeded {
return true
}
}
return false
}
func min(arr []int64) int64 {
min := arr[0]
for _, value := range arr {
if value < min {
min = value
}
}
return min
}
func max(arr []int64) int64 {
max := arr[0]
for _, value := range arr {
if value > max {
max = value
}
}
return max
}
func mean(arr []int64) int64 {
sum := int64(0)
for _, value := range arr {
sum += value
}
return int64(sum / int64(len(arr)))
}

View File

@@ -1,12 +1,18 @@
package version
var TMCoreSemVer = TMVersionDefault
const (
// TMVersionDefault is the used as the fallback version of Tendermint Core
// when not using git describe. It is formatted with semantic versioning.
TMCoreSemVer = "0.38.0-dev"
TMVersionDefault = "0.38.0-dev"
// ABCISemVer is the semantic version of the ABCI protocol
ABCISemVer = "1.0.0"
ABCISemVer = "1.0.0"
ABCIVersion = ABCISemVer
)
var (
// P2PProtocol versions all p2p behavior and msgs.
// This includes proposer selection.
P2PProtocol uint64 = 8
@@ -15,9 +21,3 @@ const (
// This includes validity of blocks and state updates.
BlockProtocol uint64 = 11
)
var (
// TMGitCommitHash uses git rev-parse HEAD to find commit hash which is helpful
// for the engineering team when working with the tendermint binary. See Makefile
TMGitCommitHash = ""
)