Compare commits

...

38 Commits

Author SHA1 Message Date
William Banfield
1ab868180c update switch test to use new receive 2022-10-21 14:04:36 -04:00
William Banfield
27a64acc9a remove all references to NewBroadcast and change to broadcast 2022-10-21 13:54:04 -04:00
William Banfield
34ae6ae9b6 pex remove decode / encode functions 2022-10-21 13:29:30 -04:00
William Banfield
e3e7e9ec9f pex updated to wrap 2022-10-21 13:27:52 -04:00
William Banfield
5b86c5562a statesync reactor users wrapper 2022-10-21 12:59:12 -04:00
William Banfield
36decbb4c8 remove superflous extra funcs in mempool 2022-10-21 12:42:51 -04:00
William Banfield
2f7844b0b9 mempool tests pass 2022-10-21 12:41:33 -04:00
William Banfield
4ae93fb379 remove superflous decode in mempool 2022-10-21 12:22:27 -04:00
William Banfield
ca2dcac7f6 remove superflous decode in evidence code 2022-10-21 12:21:03 -04:00
William Banfield
9e1c7bdfb8 remove superflous functions in consensus 2022-10-21 11:58:57 -04:00
William Banfield
4b5373b2a5 remove wrap from blocksync 2022-10-21 11:54:22 -04:00
William Banfield
9cad0c1b33 re-wrap to mimic v0.35 2022-10-21 11:45:59 -04:00
William Banfield
5c173cbb3d consensus package uses protos directly 2022-10-20 18:21:28 -04:00
William Banfield
9742dac312 wrapper used in send 2022-10-20 17:27:57 -04:00
William Banfield
42fc1fc1cc pex implements wrap 2022-10-20 17:25:54 -04:00
William Banfield
3c6e81ad15 consensus implements wrap 2022-10-20 17:20:40 -04:00
William Banfield
3ab55dcffc statesync implements wrapper 2022-10-20 17:15:26 -04:00
William Banfield
11fe2c206d blocksync implements unwrap 2022-10-20 17:11:23 -04:00
William Banfield
ade3cee699 assert unwrapper on mempool 2022-10-20 16:59:44 -04:00
William Banfield
8015f6e254 use correct type in wrap 2022-10-20 16:59:06 -04:00
William Banfield
24c1f44085 mempool implements wrap / unwrap 2022-10-20 16:51:12 -04:00
William Banfield
27db33dbda split wrap into wrap / unwrap types 2022-10-20 16:50:57 -04:00
William Banfield
61834bb5fc remove validate 2022-10-20 16:47:31 -04:00
William Banfield
2b7b2f2012 buildable 2022-10-20 16:47:22 -04:00
William Banfield
9b2c2ee3af resurrect wrap types 2022-10-20 16:43:37 -04:00
William Banfield
b11060153e put wrap into receive 2022-10-20 16:37:03 -04:00
William Banfield
a105d2d0ea add wrapper 2022-10-20 15:19:32 -04:00
William Banfield
4d78096843 p2p: ressurrect the p2p envelope and use to calculate message metric 2022-10-20 14:19:10 -04:00
Sergio Mena
b42c439776 Fix TX payload for DO testnets (#9540)
* Added print

* Fix unmarshall

* Fix unmarshalling

* Simplified steps to unmarshall

* minor

* Use 'encoding/hex'

* Forget about C, this is Go!

* gosec warning

* Set maximum payload size

* nosec annotation
2022-10-12 19:58:50 +02:00
omahs
387bf6795a Fix: typos (#9536)
* Fix: typos

Fix: typos

* Fix: minor typo

Fix: minor typo
2022-10-11 07:54:03 -04:00
Sergio Mena
4f3e87b2e4 Add changelog entry (#9535) 2022-10-10 16:21:06 +02:00
William Banfield
a371b1e3a8 blocksync: retry requests after timeout (#9518)
* blocksync: retry requests after timeout

* Minimize changes to re-send block request after timeout

* TO REVERT: reduce queue capacity

* Add reset

* Revert "TO REVERT: reduce queue capacity"

This reverts commit dd0fee56924c958bed2ab7733e1917eb88fb5957.

* 30 seconds

* don't reset the timer

* Update blocksync/pool.go

Co-authored-by: Callum Waters <cmwaters19@gmail.com>

Co-authored-by: Sergio Mena <sergio@informal.systems>
Co-authored-by: Callum Waters <cmwaters19@gmail.com>
2022-10-10 14:58:24 +02:00
dependabot[bot]
9dd99e9294 build(deps): Bump google.golang.org/grpc from 1.49.0 to 1.50.0 (#9529) 2022-10-10 10:59:44 +02:00
Callum Waters
4fd19a275e indexer: move deduplication functionality purely to the kvindexer (#9473) 2022-10-07 15:54:44 +02:00
Callum Waters
8d26460f9d rename blockchain to blocksync in certain areas (#9512) 2022-10-06 10:44:12 +02:00
Jasmina Malicevic
c0bdb2423a security/p2p: prevent peers who errored being added to the peer_set (#9500)
* Mark failed removal of peer to address security bug

Co-authored-by: Callum Waters <cmwaters19@gmail.com>
2022-10-06 09:02:08 +02:00
Sergio Mena
cdd3479f20 Extend the load report tool to include transactions' hashes (#9509)
* Add transaction hash to raw data

* Add hash in formatted output

* Cosmetic
2022-10-05 21:16:45 +02:00
Giuliano
b1dc5a6def fix wrong axioms (#9511)
Co-authored-by: Josef Widder <44643235+josef-widder@users.noreply.github.com>
2022-10-05 11:38:21 +02:00
62 changed files with 1407 additions and 945 deletions

View File

@@ -96,3 +96,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [consensus] \#9229 fix round number of `enterPropose` when handling `RoundStepNewRound` timeout. (@fatcat22)
- [docker] \#9073 enable cross platform build using docker buildx
- [blocksync] \#9518 handle the case when the sending queue is full: retry block request after a timeout

View File

@@ -12,7 +12,7 @@ and hence to Tendermint.
* We are committed to providing a friendly, safe and welcoming environment for
all, regardless of level of experience, gender, gender identity and
expression, sexual orientation, disability, personal appearance, body size,
race, ethnicity, age, religion, nationality, or other similar characteristic.
race, ethnicity, age, religion, nationality, or other similar characteristics.
* On Slack, please avoid using overtly sexual nicknames or other nicknames that
might detract from a friendly, safe and welcoming environment for all.

View File

@@ -12,7 +12,7 @@ landing changes in `main`.
All work on the code base should be motivated by a [Github
Issue](https://github.com/tendermint/tendermint/issues).
[Search](https://github.com/tendermint/tendermint/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22)
is a good place start when looking for places to contribute. If you
is a good place to start when looking for places to contribute. If you
would like to work on an issue which already exists, please indicate so
by leaving a comment.
@@ -213,7 +213,7 @@ Changes with multiple classifications should be doubly included (eg. a bug fix
that is also a breaking change should be recorded under both).
Breaking changes are further subdivided according to the APIs/users they impact.
Any change that effects multiple APIs/users should be recorded multiply - for
Any change that affects multiple APIs/users should be recorded multiply - for
instance, a change to the `Blockchain Protocol` that removes a field from the
header should also be recorded under `CLI/RPC/Config` since the field will be
removed from the header in RPC responses as well.
@@ -247,7 +247,7 @@ To begin contributing, create a development branch either on `github.com/tenderm
Make changes, and before submitting a pull request, update the `CHANGELOG_PENDING.md` to record your change. Also, run either `git rebase` or `git merge` on top of the latest `main`. (Since pull requests are squash-merged, either is fine!)
Update the `UPGRADING.md` if the change you've made is breaking and the
instructions should be in place for a user on how he/she can upgrade it's
instructions should be in place for a user on how he/she can upgrade its
software (ABCI application, Tendermint-based blockchain, light client, wallet).
Once you have submitted a pull request label the pull request with either `R:minor`, if the change should be included in the next minor release, or `R:major`, if the change is meant for a major release.

View File

@@ -19,58 +19,6 @@ const (
BlockResponseMessageFieldKeySize
)
// EncodeMsg encodes a Protobuf message
func EncodeMsg(pb proto.Message) ([]byte, error) {
msg := bcproto.Message{}
switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}
bz, err := proto.Marshal(&msg)
if err != nil {
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
}
return bz, nil
}
// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -80,7 +80,7 @@ func TestBcStatusResponseMessageValidateBasic(t *testing.T) {
}
//nolint:lll // ignore line length in tests
func TestBlockchainMessageVectors(t *testing.T) {
func TestBlocksyncMessageVectors(t *testing.T) {
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil)
block.Version.Block = 11 // overwrite updated protocol version

View File

@@ -32,6 +32,7 @@ const (
maxTotalRequesters = 600
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
requestRetrySeconds = 30
// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we
@@ -602,7 +603,7 @@ OUTER_LOOP:
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
// log.Info("No peers available", "height", height)
bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
@@ -612,6 +613,7 @@ OUTER_LOOP:
bpr.peerID = peer.id
bpr.mtx.Unlock()
to := time.NewTimer(requestRetrySeconds * time.Second)
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
WAIT_LOOP:
@@ -624,6 +626,11 @@ OUTER_LOOP:
return
case <-bpr.Quit():
return
case <-to.C:
bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID)
// Simulate a redo
bpr.reset()
continue OUTER_LOOP
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()

View File

@@ -30,7 +30,7 @@ const (
)
type consensusReactor interface {
// for when we switch from blockchain reactor and block sync to
// for when we switch from blocksync reactor and block sync to
// the consensus machine
SwitchToConsensus(state sm.State, skipWAL bool)
}
@@ -143,21 +143,20 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: MaxMsgSize,
MessageType: &bcproto.Message{},
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
peer.Send(BlocksyncChannel, msgBytes)
peer.Send(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
},
})
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
@@ -182,69 +181,53 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return false
}
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
}
return src.TrySend(BlocksyncChannel, msgBytes)
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
})
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
return src.TrySend(BlocksyncChannel, msgBytes)
return src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
bcR.Switch.StopPeerForError(src, err)
func (bcR *Reactor) Receive(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}
if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
bcR.respondToPeer(msg, e.Src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
e.Src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
},
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
return
}
src.TrySend(BlocksyncChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -285,13 +268,10 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}
queued := peer.TrySend(BlocksyncChannel, msgBytes)
queued := peer.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockRequest{Height: request.Height},
})
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
@@ -430,13 +410,9 @@ FOR_LOOP:
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
bm, err := EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
}
bcR.Switch.Broadcast(BlocksyncChannel, bm)
bcR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusRequest{},
})
return nil
}

View File

@@ -146,13 +146,13 @@ func newReactor(
}
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
bcReactor.SetLogger(logger.With("module", "blocksync"))
return ReactorPair{bcReactor, proxyApp}
}
func TestNoBlockResponse(t *testing.T) {
config = test.ResetTestRoot("blockchain_reactor_test")
config = test.ResetTestRoot("blocksync_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
@@ -164,7 +164,7 @@ func TestNoBlockResponse(t *testing.T) {
reactorPairs[1] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
s.AddReactor("BLOCKSYNC", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
@@ -214,7 +214,7 @@ func TestNoBlockResponse(t *testing.T) {
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
config = test.ResetTestRoot("blockchain_reactor_test")
config = test.ResetTestRoot("blocksync_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
@@ -239,7 +239,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
reactorPairs[3] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
s.AddReactor("BLOCKSYNC", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
@@ -278,7 +278,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
s.AddReactor("BLOCKSYNC", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)

View File

@@ -26,6 +26,7 @@ import (
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
@@ -165,10 +166,16 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{prevote1.ToProto()},
ChannelID: VoteChannel,
})
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{prevote2.ToProto()},
ChannelID: VoteChannel,
})
}
}
} else {
@@ -520,18 +527,26 @@ func sendProposalAndParts(
parts *types.PartSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
})
// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
pp, err := part.ToProto()
if err != nil {
panic(err) // TODO: wbanfield better error handling
}
peer.Send(DataChannel, MustEncode(msg))
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: *pp,
},
})
}
// votes
@@ -539,9 +554,14 @@ func sendProposalAndParts(
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{prevote.ToProto()},
})
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{precommit.ToProto()},
})
}
//----------------------------------------
@@ -579,7 +599,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
func (br *ByzantineReactor) Receive(e p2p.Envelope) {
br.reactor.Receive(e)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

View File

@@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
@@ -94,7 +95,10 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
peer.Send(p2p.Envelope{
Message: &tmcons.Vote{precommit.ToProto()},
ChannelID: VoteChannel,
})
}
}()
}

View File

@@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"github.com/cosmos/gogoproto/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmmath "github.com/tendermint/tendermint/libs/math"
@@ -15,7 +13,9 @@ import (
"github.com/tendermint/tendermint/types"
)
// MsgToProto takes a consensus message type and returns the proto defined consensus message
// MsgToProto takes a consensus message type and returns the proto defined consensus message.
//
// TODO: This needs to be removed, but WALToProto depends on this.
func MsgToProto(msg Message) (*tmcons.Message, error) {
if msg == nil {
return nil, errors.New("consensus: message is nil")
@@ -260,20 +260,6 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
return pb, nil
}
// MustEncode takes the reactors msg, makes it proto and marshals it
// this mimics `MustMarshalBinaryBare` in that is panics on error
func MustEncode(msg Message) []byte {
pb, err := MsgToProto(msg)
if err != nil {
panic(err)
}
enc, err := proto.Marshal(pb)
if err != nil {
panic(err)
}
return enc
}
// WALToProto takes a WAL message and return a proto walMessage and error
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
var pb tmcons.WALMessage

View File

@@ -7,8 +7,6 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/bits"
tmevents "github.com/tendermint/tendermint/libs/events"
@@ -148,6 +146,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
Priority: 6,
SendQueueCapacity: 100,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: DataChannel, // maybe split between gossiping current block and catchup stuff
@@ -156,6 +155,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: VoteChannel,
@@ -163,6 +163,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 100,
RecvBufferCapacity: 100 * 100,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
{
ID: VoteSetBitsChannel,
@@ -170,6 +171,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 2,
RecvBufferCapacity: 1024,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmcons.Message{},
},
}
}
@@ -223,34 +225,44 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
func (conR *Reactor) Receive(e p2p.Envelope) {
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)
return
}
msg, err := decodeMsg(msgBytes)
if w, ok := e.Message.(p2p.Wrapper); ok {
var err error
e.Message, err = w.Wrap()
if err != nil {
conR.Logger.Error("Error wrapping message", "src", e.Src, "chId", e.ChannelID, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
}
msg, err := MsgFromProto(e.Message.(*tmcons.Message))
if err != nil {
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
conR.Switch.StopPeerForError(src, err)
conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
if err = msg.ValidateBasic(); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
// Get peer states
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
ps, ok := e.Src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", src))
panic(fmt.Sprintf("Peer %v has no state", e.Src))
}
switch chID {
switch e.ChannelID {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
@@ -258,8 +270,8 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(src, err)
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
ps.ApplyNewRoundStepMessage(msg)
@@ -278,7 +290,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
// Peer claims to have a maj23 for some BlockID at H,R,S,
err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
if err != nil {
conR.Switch.StopPeerForError(src, err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
// Respond with a VoteSetBitsMessage showing which votes we have.
@@ -292,13 +304,19 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{
eMsg := &tmcons.VoteSetBits{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}))
BlockID: msg.BlockID.ToProto(),
}
if votes := ourVotes.ToProto(); votes != nil {
eMsg.Votes = *votes
}
e.Src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: eMsg,
})
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -311,13 +329,13 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -337,7 +355,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
// don't punish (leave room for soft upgrades)
@@ -376,7 +394,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", e.ChannelID))
}
}
@@ -430,29 +448,39 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.Broadcast(StateChannel, MustEncode(nrsMsg))
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: nrsMsg,
})
}
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
csMsg := &NewValidBlockMessage{
psh := rs.ProposalBlockParts.Header()
csMsg := &tmcons.NewValidBlock{
Height: rs.Height,
Round: rs.Round,
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
BlockPartSetHeader: psh.ToProto(),
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
conR.Switch.Broadcast(StateChannel, MustEncode(csMsg))
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: csMsg,
})
}
// Broadcasts HasVoteMessage to peers that care.
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
msg := &HasVoteMessage{
msg := &tmcons.HasVote{
Height: vote.Height,
Round: vote.Round,
Type: vote.Type,
Index: vote.ValidatorIndex,
}
conR.Switch.Broadcast(StateChannel, MustEncode(msg))
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: msg,
})
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
@@ -463,7 +491,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
prs := ps.GetRoundState()
if prs.Height == vote.Height {
// TODO: Also filter on round?
peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
e := p2p.Envelope{
ChannelID: StateChannel, struct{ ConsensusMessage }{msg},
Message: p,
}
peer.TrySend(e)
} else {
// Height doesn't match
// TODO: check a field, maybe CatchupCommitRound?
@@ -473,11 +505,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) {
nrsMsg = &tmcons.NewRoundStep{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
Step: uint32(rs.Step),
SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.GetRound(),
}
@@ -487,7 +519,10 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage)
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.getRoundState()
nrsMsg := makeRoundStepMessage(rs)
peer.Send(StateChannel, MustEncode(nrsMsg))
peer.Send(p2p.Envelope{
ChannelID: StateChannel,
Message: nrsMsg,
})
}
func (conR *Reactor) updateRoundStateRoutine() {
@@ -526,13 +561,19 @@ OUTER_LOOP:
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
msg := &BlockPartMessage{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
parts, err := part.ToProto()
if err != nil {
panic(err)
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: *parts,
},
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
@@ -578,9 +619,11 @@ OUTER_LOOP:
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
{
msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, MustEncode(msg)) {
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
}) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
}
@@ -590,13 +633,15 @@ OUTER_LOOP:
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{
Height: rs.Height,
ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
}
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(DataChannel, MustEncode(msg))
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.ProposalPOL{
Height: rs.Height,
ProposalPolRound: rs.Proposal.POLRound,
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
},
})
}
continue OUTER_LOOP
}
@@ -633,13 +678,20 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
return
}
// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
if peer.Send(DataChannel, MustEncode(msg)) {
pp, err := part.ToProto()
if err != nil {
logger.Error("Could not convert part to proto", "index", index, "error", err)
return
}
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: *pp,
},
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
logger.Debug("Sending block part for catchup failed")
@@ -798,12 +850,16 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23,
}))
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -815,12 +871,15 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23,
}))
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -832,12 +891,16 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23,
}))
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -852,12 +915,15 @@ OUTER_LOOP:
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID,
}))
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
@@ -1071,9 +1137,13 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(VoteChannel, MustEncode(msg)) {
if ps.peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{
Vote: vote.ToProto(),
},
}) {
ps.SetHasVote(vote)
return true
}
@@ -1439,15 +1509,6 @@ func init() {
tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
}
func decodeMsg(bz []byte) (msg Message, err error) {
pb := &tmcons.Message{}
if err = proto.Unmarshal(bz, pb); err != nil {
return msg, err
}
return MsgFromProto(pb)
}
//-------------------------------------
// NewRoundStepMessage is sent for every step taken in the ConsensusState.

View File

@@ -33,6 +33,7 @@ import (
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
statemocks "github.com/tendermint/tendermint/state/mocks"
@@ -265,15 +266,18 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
reactor.InitPeer(peer)
// simulate switch calling Receive before AddPeer
assert.NotPanics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
reactor.AddPeer(peer)
})
}
@@ -288,15 +292,18 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
// we should call InitPeer here
// simulate switch calling Receive before AddPeer
assert.Panics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
})
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"time"
"github.com/cosmos/gogoproto/proto"
clist "github.com/tendermint/tendermint/libs/clist"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
@@ -55,6 +56,7 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
ID: EvidenceChannel,
Priority: 6,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmproto.EvidenceList{},
},
}
}
@@ -66,11 +68,11 @@ func (evR *Reactor) AddPeer(peer p2p.Peer) {
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
evis, err := decodeMsg(msgBytes)
func (evR *Reactor) Receive(e p2p.Envelope) {
evis, err := evidenceListFromProto(e.Message)
if err != nil {
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
evR.Switch.StopPeerForError(src, err)
evR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
evR.Switch.StopPeerForError(e.Src, err)
return
}
@@ -80,7 +82,7 @@ func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
case *types.ErrInvalidEvidence:
evR.Logger.Error(err.Error())
// punish peer
evR.Switch.StopPeerForError(src, err)
evR.Switch.StopPeerForError(e.Src, err)
return
case nil:
default:
@@ -126,11 +128,15 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
evis := evR.prepareEvidenceMessage(peer, ev)
if len(evis) > 0 {
evR.Logger.Debug("Gossiping evidence to peer", "ev", ev, "peer", peer)
msgBytes, err := encodeMsg(evis)
evp, err := evidenceListToProto(evis)
if err != nil {
panic(err)
}
success := peer.Send(EvidenceChannel, msgBytes)
success := peer.Send(p2p.Envelope{
ChannelID: EvidenceChannel,
Message: evp,
})
if !success {
time.Sleep(peerRetryMessageIntervalMS * time.Millisecond)
continue
@@ -210,7 +216,7 @@ type PeerState interface {
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func encodeMsg(evis []types.Evidence) ([]byte, error) {
func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
evi := make([]tmproto.Evidence, len(evis))
for i := 0; i < len(evis); i++ {
ev, err := types.EvidenceToProto(evis[i])
@@ -222,19 +228,13 @@ func encodeMsg(evis []types.Evidence) ([]byte, error) {
epl := tmproto.EvidenceList{
Evidence: evi,
}
return epl.Marshal()
return &epl, nil
}
// decodemsg takes an array of bytes
// returns an array of evidence
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
lm := tmproto.EvidenceList{}
if err := lm.Unmarshal(bz); err != nil {
return nil, err
}
func evidenceListFromProto(m proto.Message) ([]types.Evidence, error) {
lm := m.(*tmproto.EvidenceList)
evis = make([]types.Evidence, len(lm.Evidence))
evis := make([]types.Evidence, len(lm.Evidence))
for i := 0; i < len(lm.Evidence); i++ {
ev, err := types.EvidenceFromProto(&lm.Evidence[i])
if err != nil {

View File

@@ -208,7 +208,10 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
// i.e. broadcastEvidenceRoutine finishes when peer is stopped
defer leaktest.CheckTimeout(t, 10*time.Second)()
p.On("Send", evidence.EvidenceChannel, mock.AnythingOfType("[]uint8")).Return(false)
p.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == evidence.EvidenceChannel
})).Return(false)
quitChan := make(<-chan struct{})
p.On("Quit").Return(quitChan)
ps := peerState{2}

2
go.mod
View File

@@ -34,7 +34,7 @@ require (
github.com/tendermint/tm-db v0.6.6
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
golang.org/x/net v0.0.0-20220812174116-3211cb980234
google.golang.org/grpc v1.49.0
google.golang.org/grpc v1.50.0
)
require (

4
go.sum
View File

@@ -1696,8 +1696,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc v1.50.0 h1:fPVVDxY9w++VjTZsYvXWqEf9Rqar/e+9zYfxKK+W+YU=
google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -134,6 +134,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
ID: mempool.MempoolChannel,
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
MessageType: &protomem.Message{},
},
}
}
@@ -154,27 +155,33 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := memR.decodeMsg(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
memR.Switch.StopPeerForError(src, err)
return
}
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)}
if src != nil {
txInfo.SenderP2PID = src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
func (memR *Reactor) Receive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
return
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
var err error
for _, tx := range protoTxs {
ntx := types.Tx(tx)
err = memR.mempool.CheckTx(ntx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
return
}
// broadcasting happens from go routines per peer
@@ -234,18 +241,14 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// https://github.com/tendermint/tendermint/issues/5796
if _, ok := memTx.senders.Load(peerID); !ok {
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
success := peer.Send(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Message: &protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
success := peer.Send(mempool.MempoolChannel, bz)
})
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
@@ -264,35 +267,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}
}
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
msg := protomem.Message{}
err := msg.Unmarshal(bz)
if err != nil {
return TxsMessage{}, err
}
var message TxsMessage
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
txs := i.Txs.GetTxs()
if len(txs) == 0 {
return message, errors.New("empty TxsMessage")
}
decoded := make([]types.Tx, len(txs))
for j, tx := range txs {
decoded[j] = types.Tx(tx)
}
message = TxsMessage{
Txs: decoded,
}
return message, nil
}
return message, fmt.Errorf("msg type: %T is not supported", msg)
}
// TxsMessage is a Message containing transactions.
type TxsMessage struct {
Txs []types.Tx

View File

@@ -22,6 +22,7 @@ import (
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@@ -279,7 +280,12 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
for i := 0; i < mempool.MaxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(mempool.MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
reactor.Receive(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Src: peer,
Message: &protomem.Txs{
Txs: [][]byte{{0x01, 0x02, 0x03}},
}})
reactor.AddPeer(peer)
}
}

View File

@@ -133,6 +133,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
ID: mempool.MempoolChannel,
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
MessageType: &protomem.Message{},
},
}
}
@@ -153,27 +154,35 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := memR.decodeMsg(msgBytes)
if err != nil {
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
memR.Switch.StopPeerForError(src, err)
func (memR *Reactor) Receive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
protoTxs := msg.GetTxs()
if len(protoTxs) == 0 {
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
return
}
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
if e.Src != nil {
txInfo.SenderP2PID = e.Src.ID()
}
var err error
for _, tx := range protoTxs {
ntx := types.Tx(tx)
err = memR.mempool.CheckTx(ntx, nil, txInfo)
if errors.Is(err, mempool.ErrTxInCache) {
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
}
}
default:
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
return
}
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)}
if src != nil {
txInfo.SenderP2PID = src.ID()
}
for _, tx := range msg.Txs {
err = memR.mempool.CheckTx(tx, nil, txInfo)
if err == mempool.ErrTxInCache {
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
}
}
// broadcasting happens from go routines per peer
}
@@ -233,18 +242,14 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796
if !memTx.HasPeer(peerID) {
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
success := peer.Send(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Message: &protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
success := peer.Send(mempool.MempoolChannel, bz)
})
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue
@@ -268,37 +273,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
//-----------------------------------------------------------------------------
// Messages
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
msg := protomem.Message{}
err := msg.Unmarshal(bz)
if err != nil {
return TxsMessage{}, err
}
var message TxsMessage
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
txs := i.Txs.GetTxs()
if len(txs) == 0 {
return message, errors.New("empty TxsMessage")
}
decoded := make([]types.Tx, len(txs))
for j, tx := range txs {
decoded[j] = types.Tx(tx)
}
message = TxsMessage{
Txs: decoded,
}
return message, nil
}
return message, fmt.Errorf("msg type: %T is not supported", msg)
}
//-------------------------------------
// TxsMessage is a Message containing transactions.
type TxsMessage struct {
Txs []types.Tx

View File

@@ -31,7 +31,7 @@ To replace the built-in p2p.Reactor, use the CustomReactors option:
dbProvider,
metricsProvider,
logger,
CustomReactors(map[string]p2p.Reactor{"BLOCKCHAIN": customBlockchainReactor}),
CustomReactors(map[string]p2p.Reactor{"BLOCKSYNC": customBlocksyncReactor}),
)
The list of existing reactors can be found in CustomReactors documentation.

View File

@@ -146,7 +146,7 @@ type blockSyncReactor interface {
// result in replacing it with the custom one.
//
// - MEMPOOL
// - BLOCKCHAIN
// - BLOCKSYNC
// - CONSENSUS
// - EVIDENCE
// - PEX
@@ -303,7 +303,7 @@ func createAndStartIndexerService(
blockIndexer = &blockidxnull.BlockerIndexer{}
}
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
indexerService.SetLogger(logger.With("module", "txindex"))
if err := indexerService.Start(); err != nil {
@@ -441,7 +441,7 @@ func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
return evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(config *cfg.Config,
func createBlocksyncReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
@@ -457,7 +457,7 @@ func createBlockchainReactor(config *cfg.Config,
return nil, fmt.Errorf("unknown fastsync version %s", config.BlockSync.Version)
}
bcReactor.SetLogger(logger.With("module", "blockchain"))
bcReactor.SetLogger(logger.With("module", "blocksync"))
return bcReactor, nil
}
@@ -584,7 +584,7 @@ func createSwitch(config *cfg.Config,
)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("BLOCKSYNC", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
@@ -803,7 +803,7 @@ func NewNode(config *cfg.Config,
return nil, err
}
// make block executor for consensus and blockchain reactors to execute blocks
// make block executor for consensus and blocksync reactors to execute blocks
blockExec := sm.NewBlockExecutor(
stateStore,
logger.With("module", "state"),
@@ -814,10 +814,10 @@ func NewNode(config *cfg.Config,
sm.BlockExecutorWithMetrics(smMetrics),
)
// Make BlockchainReactor. Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
// Make BlocksyncReactor. Don't start block sync if we're doing a state sync first.
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
@@ -990,7 +990,7 @@ func (n *Node) OnStart() error {
if n.stateSync {
bcR, ok := n.bcReactor.(blockSyncReactor)
if !ok {
return fmt.Errorf("this blockchain reactor does not support switching from state sync")
return fmt.Errorf("this blocksync reactor does not support switching from state sync")
}
err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
n.config.StateSync, n.config.BlockSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis)

View File

@@ -427,7 +427,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
RecvMessageCapacity: 100,
},
}
customBlockchainReactor := p2pmock.NewReactor()
customBlocksyncReactor := p2pmock.NewReactor()
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)
@@ -440,7 +440,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
log.TestingLogger(),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKCHAIN": customBlockchainReactor}),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}),
)
require.NoError(t, err)
@@ -451,8 +451,8 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
assert.True(t, cr.IsRunning())
assert.Equal(t, cr, n.Switch().Reactor("FOO"))
assert.True(t, customBlockchainReactor.IsRunning())
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
assert.True(t, customBlocksyncReactor.IsRunning())
assert.Equal(t, customBlocksyncReactor, n.Switch().Reactor("BLOCKSYNC"))
channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
assert.Contains(t, channels, mempl.MempoolChannel)

View File

@@ -38,13 +38,13 @@ type Reactor interface {
// or other reason).
RemovePeer(peer Peer, reason interface{})
// Receive is called by the switch when msgBytes is received from the peer.
// Receive is called by the switch when a message is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
// copying.
//
// CONTRACT: msgBytes are not nil.
Receive(chID byte, peer Peer, msgBytes []byte)
Receive(Envelope)
}
//--------------------------------------
@@ -64,8 +64,8 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor {
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) Receive(e Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }

View File

@@ -724,6 +724,7 @@ type ChannelDescriptor struct {
SendQueueCapacity int
RecvBufferCapacity int
RecvMessageCapacity int
MessageType proto.Message
}
func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {

View File

@@ -145,6 +145,13 @@ func (e ErrTransportClosed) Error() string {
return "transport has been closed"
}
// ErrPeerRemoval is raised when attempting to remove a peer results in an error.
type ErrPeerRemoval struct{}
func (e ErrPeerRemoval) Error() string {
return "peer removal failed"
}
//-------------------------------------------------------------------
type ErrNetAddressNoID struct {

View File

@@ -44,15 +44,29 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "num_txs",
Help: "Number of transactions submitted by each peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
MessageReceiveBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "message_receive_bytes_total",
Help: "Number of bytes of each message type received.",
}, append(labels, "message_type")).With(labelsAndValues...),
MessageSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "message_send_bytes_total",
Help: "Number of bytes of each message type sent.",
}, append(labels, "message_type")).With(labelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
Peers: discard.NewGauge(),
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
NumTxs: discard.NewGauge(),
Peers: discard.NewGauge(),
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
NumTxs: discard.NewGauge(),
MessageReceiveBytesTotal: discard.NewCounter(),
MessageSendBytesTotal: discard.NewCounter(),
}
}

View File

@@ -24,4 +24,8 @@ type Metrics struct {
PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"`
// Number of transactions submitted by each peer.
NumTxs metrics.Gauge `metrics_labels:"peer_id"`
// Number of bytes of each message type received.
MessageReceiveBytesTotal metrics.Counter `metrics_labels:"message_type"`
// Number of bytes of each message type sent.
MessageSendBytesTotal metrics.Counter `metrics_labels:"message_type"`
}

View File

@@ -42,9 +42,9 @@ func NewPeer(ip net.IP) *Peer {
return mp
}
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *Peer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(e p2p.Envelope) bool { return true }
func (mp *Peer) Send(e p2p.Envelope) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
DefaultNodeID: mp.addr.ID,
@@ -68,3 +68,5 @@ func (mp *Peer) RemoteIP() net.IP { return mp.ip }
func (mp *Peer) SocketAddr() *p2p.NetAddress { return mp.addr }
func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *Peer) CloseConn() error { return nil }
func (mp *Peer) SetRemovalFailed() {}
func (mp *Peer) GetRemovalFailed() bool { return false }

View File

@@ -19,7 +19,7 @@ func NewReactor() *Reactor {
return r
}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(e p2p.Envelope) {}

View File

@@ -53,6 +53,20 @@ func (_m *Peer) Get(_a0 string) interface{} {
return r0
}
// GetRemovalFailed provides a mock function with given fields:
func (_m *Peer) GetRemovalFailed() bool {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// ID provides a mock function with given fields:
func (_m *Peer) ID() p2p.ID {
ret := _m.Called()
@@ -220,13 +234,13 @@ func (_m *Peer) Reset() error {
return r0
}
// Send provides a mock function with given fields: _a0, _a1
func (_m *Peer) Send(_a0 byte, _a1 []byte) bool {
ret := _m.Called(_a0, _a1)
// Send provides a mock function with given fields: _a0
func (_m *Peer) Send(_a0 p2p.Envelope) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(byte, []byte) bool); ok {
r0 = rf(_a0, _a1)
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
@@ -244,6 +258,11 @@ func (_m *Peer) SetLogger(_a0 log.Logger) {
_m.Called(_a0)
}
// SetRemovalFailed provides a mock function with given fields:
func (_m *Peer) SetRemovalFailed() {
_m.Called()
}
// SocketAddr provides a mock function with given fields:
func (_m *Peer) SocketAddr() *p2p.NetAddress {
ret := _m.Called()
@@ -316,13 +335,13 @@ func (_m *Peer) String() string {
return r0
}
// TrySend provides a mock function with given fields: _a0, _a1
func (_m *Peer) TrySend(_a0 byte, _a1 []byte) bool {
ret := _m.Called(_a0, _a1)
// TrySend provides a mock function with given fields: _a0
func (_m *Peer) TrySend(_a0 p2p.Envelope) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(byte, []byte) bool); ok {
r0 = rf(_a0, _a1)
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}

View File

@@ -5,6 +5,8 @@ import (
"net"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/libs/cmap"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@@ -34,11 +36,14 @@ type Peer interface {
Status() tmconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket
Send(byte, []byte) bool
TrySend(byte, []byte) bool
Send(Envelope) bool
TrySend(Envelope) bool
Set(string, interface{})
Get(string) interface{}
SetRemovalFailed()
GetRemovalFailed() bool
}
//----------------------------------------------------------
@@ -117,6 +122,9 @@ type peer struct {
metrics *Metrics
metricsTicker *time.Ticker
// When removal of a peer fails, we set this flag
removalAttemptFailed bool
}
type PeerOption func(*peer)
@@ -126,6 +134,7 @@ func newPeer(
mConfig tmconn.MConnConfig,
nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor,
msgTypeByChID map[byte]proto.Message,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
@@ -143,6 +152,7 @@ func newPeer(
pc.conn,
p,
reactorsByCh,
msgTypeByChID,
chDescs,
onPeerError,
mConfig,
@@ -243,19 +253,31 @@ func (p *peer) Status() tmconn.ConnectionStatus {
// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
func (p *peer) Send(chID byte, msgBytes []byte) bool {
func (p *peer) Send(e Envelope) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(chID) {
} else if !p.hasChannel(e.ChannelID) {
return false
}
res := p.mconn.Send(chID, msgBytes)
msg := e.Message
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
if err != nil {
panic(err)
}
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
panic(err) // Q: should this panic or error?
}
res := p.mconn.Send(e.ChannelID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
"chID", fmt.Sprintf("%#x", e.ChannelID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
@@ -264,17 +286,29 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool {
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
func (p *peer) TrySend(e Envelope) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
} else if !p.hasChannel(e.ChannelID) {
return false
}
res := p.mconn.TrySend(chID, msgBytes)
msg := e.Message
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
if err != nil {
panic(err)
}
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
panic(err)
}
res := p.mconn.TrySend(e.ChannelID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
"chID", fmt.Sprintf("%#x", e.ChannelID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
}
@@ -316,6 +350,14 @@ func (p *peer) CloseConn() error {
return p.peerConn.conn.Close()
}
func (p *peer) SetRemovalFailed() {
p.removalAttemptFailed = true
}
func (p *peer) GetRemovalFailed() bool {
return p.removalAttemptFailed
}
//---------------------------------------------------
// methods only used for testing
// TODO: can we remove these?
@@ -370,6 +412,7 @@ func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
msgTypeByChID map[byte]proto.Message,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,
@@ -382,12 +425,31 @@ func createMConnection(
// which does onPeerError.
panic(fmt.Sprintf("Unknown channel %X", chID))
}
mt := msgTypeByChID[chID]
msg := proto.Clone(mt)
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
// TODO(williambanfield) add a log line
return
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
if w, ok := msg.(Unwrapper); ok {
msg, err = w.Unwrap()
if err != nil {
// TODO(williambanfield) add error log line.
return
}
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
reactor.Receive(chID, p, msgBytes)
p.metrics.MessageReceiveBytesTotal.With("message_type", "tmp").Add(float64(len(msgBytes)))
reactor.Receive(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
}
onError := func(r interface{}) {

View File

@@ -47,6 +47,9 @@ func (ps *PeerSet) Add(peer Peer) error {
if ps.lookup[peer.ID()] != nil {
return ErrSwitchDuplicatePeerID{peer.ID()}
}
if peer.GetRemovalFailed() {
return ErrPeerRemoval{}
}
index := len(ps.list)
// Appending is safe even with other goroutines
@@ -107,6 +110,12 @@ func (ps *PeerSet) Remove(peer Peer) bool {
item := ps.lookup[peer.ID()]
if item == nil {
// Removing the peer has failed so we set a flag to mark that a removal was attempted.
// This can happen when the peer add routine from the switch is running in
// parallel to the receive routine of MConn.
// There is an error within MConn but the switch has not actually added the peer to the peer set yet.
// Setting this flag will prevent a peer from being added to a node's peer set afterwards.
peer.SetRemovalFailed()
return false
}

View File

@@ -18,20 +18,22 @@ type mockPeer struct {
id ID
}
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySend(e Envelope) bool { return true }
func (mp *mockPeer) Send(e Envelope) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
func (mp *mockPeer) SetRemovalFailed() {}
func (mp *mockPeer) GetRemovalFailed() bool { return false }
// Returns a mock peer
func newMockPeer(ip net.IP) *mockPeer {

View File

@@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -14,6 +15,7 @@ import (
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/config"
tmconn "github.com/tendermint/tendermint/p2p/conn"
@@ -70,7 +72,7 @@ func TestPeerSend(t *testing.T) {
})
assert.True(p.CanSend(testCh))
assert.True(p.Send(testCh, []byte("Asylum")))
assert.True(p.Send(Envelope{ChannelID: testCh, Message: &p2p.Message{}}))
}
func createOutboundPeerAndPerformHandshake(
@@ -82,6 +84,9 @@ func createOutboundPeerAndPerformHandshake(
{ID: testCh, Priority: 1},
}
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
msgTypeByChID := map[byte]proto.Message{
testCh: &p2p.Message{},
}
pk := ed25519.GenPrivKey()
pc, err := testOutboundPeerConn(addr, config, false, pk)
if err != nil {
@@ -94,7 +99,7 @@ func createOutboundPeerAndPerformHandshake(
return nil, err
}
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {})
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {})
p.SetLogger(log.TestingLogger().With("peer", addr))
return p, nil
}

View File

@@ -6,8 +6,6 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/libs/cmap"
tmmath "github.com/tendermint/tendermint/libs/math"
tmrand "github.com/tendermint/tendermint/libs/rand"
@@ -184,6 +182,7 @@ func (r *Reactor) GetChannels() []*conn.ChannelDescriptor {
Priority: 1,
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
MessageType: &tmp2p.Message{},
},
}
}
@@ -236,16 +235,10 @@ func (r *Reactor) logErrAddrBook(err error) {
}
// Receive implements Reactor by handling incoming PEX messages.
func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
func (r *Reactor) Receive(e p2p.Envelope) {
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *tmp2p.PexRequest:
// NOTE: this is a prime candidate for amplification attacks,
@@ -255,8 +248,8 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
// If we're a seed and this is an inbound peer,
// respond once and disconnect.
if r.config.SeedMode && !src.IsOutbound() {
id := string(src.ID())
if r.config.SeedMode && !e.Src.IsOutbound() {
id := string(e.Src.ID())
v := r.lastReceivedRequests.Get(id)
if v != nil {
// FlushStop/StopPeer are already
@@ -266,36 +259,36 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
r.lastReceivedRequests.Set(id, time.Now())
// Send addrs and disconnect
r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
go func() {
// In a go-routine so it doesn't block .Receive.
src.FlushStop()
r.Switch.StopPeerGracefully(src)
e.Src.FlushStop()
r.Switch.StopPeerGracefully(e.Src)
}()
} else {
// Check we're not receiving requests too frequently.
if err := r.receiveRequest(src); err != nil {
r.Switch.StopPeerForError(src, err)
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
if err := r.receiveRequest(e.Src); err != nil {
r.Switch.StopPeerForError(e.Src, err)
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
return
}
r.SendAddrs(src, r.book.GetSelection())
r.SendAddrs(e.Src, r.book.GetSelection())
}
case *tmp2p.PexAddrs:
// If we asked for addresses, add them to the book
addrs, err := p2p.NetAddressesFromProto(msg.Addrs)
if err != nil {
r.Switch.StopPeerForError(src, err)
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
r.Switch.StopPeerForError(e.Src, err)
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
return
}
err = r.ReceiveAddrs(addrs, src)
err = r.ReceiveAddrs(addrs, e.Src)
if err != nil {
r.Switch.StopPeerForError(src, err)
r.Switch.StopPeerForError(e.Src, err)
if err == ErrUnsolicitedList {
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
}
return
}
@@ -348,7 +341,10 @@ func (r *Reactor) RequestAddrs(p Peer) {
}
r.Logger.Debug("Request addrs", "from", p)
r.requestsSent.Set(id, struct{}{})
p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{}))
p.Send(p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexRequest{},
})
}
// ReceiveAddrs adds the given addrs to the addrbook if theres an open
@@ -406,7 +402,11 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
// SendAddrs sends addrs to the peer.
func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
p.Send(PexChannel, mustEncode(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}))
e := p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
}
p.Send(e)
}
// SetEnsurePeersPeriod sets period to ensure peers connected.
@@ -763,43 +763,3 @@ func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
book.MarkAttempt(addr)
}
}
//-----------------------------------------------------------------------------
// Messages
// mustEncode proto encodes a tmp2p.Message
func mustEncode(pb proto.Message) []byte {
msg := tmp2p.Message{}
switch pb := pb.(type) {
case *tmp2p.PexRequest:
msg.Sum = &tmp2p.Message_PexRequest{PexRequest: pb}
case *tmp2p.PexAddrs:
msg.Sum = &tmp2p.Message_PexAddrs{PexAddrs: pb}
default:
panic(fmt.Sprintf("Unknown message type %T", pb))
}
bz, err := msg.Marshal()
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &tmp2p.Message{}
err := pb.Unmarshal(bz)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *tmp2p.Message_PexRequest:
return msg.PexRequest, nil
case *tmp2p.Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -131,12 +131,11 @@ func TestPEXReactorReceive(t *testing.T) {
r.RequestAddrs(peer)
size := book.Size()
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
r.Receive(PexChannel, peer, msg)
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.Equal(t, size+1, book.Size())
msg = mustEncode(&tmp2p.PexRequest{})
r.Receive(PexChannel, peer, msg) // should not panic.
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
}
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
@@ -155,20 +154,19 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
require.True(t, book.HasAddress(peerAddr))
id := string(peer.ID())
msg := mustEncode(&tmp2p.PexRequest{})
// first time creates the entry
r.Receive(PexChannel, peer, msg)
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// next time sets the last time value
r.Receive(PexChannel, peer, msg)
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// third time is too many too soon - peer is removed
r.Receive(PexChannel, peer, msg)
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.False(t, r.lastReceivedRequests.Has(id))
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peerAddr))
@@ -192,15 +190,15 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
assert.True(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
// receive some addrs. should clear the request
r.Receive(PexChannel, peer, msg)
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.False(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// receiving more unsolicited addrs causes a disconnect and ban
r.Receive(PexChannel, peer, msg)
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peer.SocketAddr()))
}
@@ -486,8 +484,12 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
pexR.RequestAddrs(peer)
size := book.Size()
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
pexR.Receive(PexChannel, peer, msg)
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
pexR.Receive(p2p.Envelope{
ChannelID: PexChannel,
Src: peer,
Message: msg,
})
assert.Equal(t, size, book.Size())
pexR.AddPeer(peer)
@@ -695,7 +697,10 @@ func TestPexVectors(t *testing.T) {
for _, tc := range testCases {
tc := tc
bz := mustEncode(tc.msg)
w, err := tc.msg.(p2p.Wrapper).Wrap()
require.NoError(t, err)
bz, err := proto.Marshal(w)
require.NoError(t, err)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
}

View File

@@ -6,9 +6,9 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/cmap"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p/conn"
@@ -69,16 +69,17 @@ type PeerFilterFunc func(IPeerSet, Peer) error
type Switch struct {
service.BaseService
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
config *config.P2PConfig
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
msgTypeByChID map[byte]proto.Message
peers *PeerSet
dialing *cmap.CMap
reconnecting *cmap.CMap
nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey
addrBook AddrBook
// peers addresses with whom we'll maintain constant connection
persistentPeersAddrs []*NetAddress
unconditionalPeerIDs map[ID]struct{}
@@ -113,6 +114,7 @@ func NewSwitch(
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
msgTypeByChID: make(map[byte]proto.Message),
peers: NewPeerSet(),
dialing: cmap.NewCMap(),
reconnecting: cmap.NewCMap(),
@@ -164,6 +166,7 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
}
sw.chDescs = append(sw.chDescs, chDesc)
sw.reactorsByCh[chID] = reactor
sw.msgTypeByChID[chID] = chDesc.MessageType
}
sw.reactors[name] = reactor
reactor.SetSwitch(sw)
@@ -182,6 +185,7 @@ func (sw *Switch) RemoveReactor(name string, reactor Reactor) {
}
}
delete(sw.reactorsByCh, chDesc.ID)
delete(sw.msgTypeByChID, chDesc.ID)
}
delete(sw.reactors, name)
reactor.SetSwitch(nil)
@@ -261,8 +265,8 @@ func (sw *Switch) OnStop() {
// closed once msg bytes are sent to all peers (or time out).
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", log.NewLazySprintf("%X", msgBytes))
func (sw *Switch) Broadcast(e Envelope) chan bool {
sw.Logger.Debug("Broadcast", "channel", e.ChannelID)
peers := sw.peers.List()
var wg sync.WaitGroup
@@ -272,7 +276,7 @@ func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := p.Send(chID, msgBytes)
success := p.Send(e)
successChan <- success
}(peer)
}
@@ -370,6 +374,10 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
// https://github.com/tendermint/tendermint/issues/3338
if sw.peers.Remove(peer) {
sw.metrics.Peers.Add(float64(-1))
} else {
// Removal of the peer has failed. The function above sets a flag within the peer to mark this.
// We keep this message here as information to the developer.
sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID())
}
}
@@ -619,11 +627,12 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool {
func (sw *Switch) acceptRoutine() {
for {
p, err := sw.transport.Accept(peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
isPersistent: sw.IsPeerPersistent,
})
if err != nil {
switch err := err.(type) {
@@ -722,11 +731,12 @@ func (sw *Switch) addOutboundPeerWithConfig(
}
p, err := sw.transport.Dial(*addr, peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
metrics: sw.metrics,
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
isPersistent: sw.IsPeerPersistent,
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
})
if err != nil {
if e, ok := err.(ErrRejected); ok {
@@ -824,6 +834,12 @@ func (sw *Switch) addPeer(p Peer) error {
// so that if Receive errors, we will find the peer and remove it.
// Add should not err since we already checked peers.Has().
if err := sw.peers.Add(p); err != nil {
switch err.(type) {
case ErrPeerRemoval:
sw.Logger.Error("Error starting peer ",
" err ", "Peer has already errored and removal was attempted.",
"peer", p.ID())
}
return err
}
sw.metrics.Peers.Add(float64(1))

View File

@@ -14,6 +14,7 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -23,6 +24,8 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/proto/tendermint/p2p"
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
var (
@@ -36,9 +39,8 @@ func init() {
}
type PeerMessage struct {
PeerID ID
Bytes []byte
Counter int
Contents proto.Message
Counter int
}
type TestReactor struct {
@@ -70,12 +72,12 @@ func (tr *TestReactor) AddPeer(peer Peer) {}
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
func (tr *TestReactor) Receive(e Envelope) {
if tr.logMessages {
tr.mtx.Lock()
defer tr.mtx.Unlock()
// fmt.Printf("Received: %X, %X\n", chID, msgBytes)
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
fmt.Printf("Received: %X, %X\n", e.ChannelID, e.Message)
tr.msgsReceived[e.ChannelID] = append(tr.msgsReceived[e.ChannelID], PeerMessage{Contents: e.Message, Counter: tr.msgsCounter})
tr.msgsCounter++
}
}
@@ -103,12 +105,12 @@ func initSwitchFunc(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
{ID: byte(0x00), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x01), Priority: 10, MessageType: &p2pproto.Message{}},
}, true))
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
{ID: byte(0x02), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x03), Priority: 10, MessageType: &p2pproto.Message{}},
}, true))
return sw
@@ -135,31 +137,47 @@ func TestSwitches(t *testing.T) {
}
// Lets send some messages
ch0Msg := []byte("channel zero")
ch1Msg := []byte("channel foo")
ch2Msg := []byte("channel bar")
s1.Broadcast(byte(0x00), ch0Msg)
s1.Broadcast(byte(0x01), ch1Msg)
s1.Broadcast(byte(0x02), ch2Msg)
ch0Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
}
ch1Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
}
ch2Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "2",
},
},
}
s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
assertMsgReceivedWithTimeout(t,
ch0Msg,
byte(0x00),
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t,
ch1Msg,
byte(0x01),
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t,
ch2Msg,
byte(0x02),
s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
s2.Reactor("bar").(*TestReactor), 200*time.Millisecond, 5*time.Second)
}
func assertMsgReceivedWithTimeout(
t *testing.T,
msgBytes []byte,
msg proto.Message,
channel byte,
reactor *TestReactor,
checkPeriod,
@@ -170,9 +188,13 @@ func assertMsgReceivedWithTimeout(
select {
case <-ticker.C:
msgs := reactor.getMsgs(channel)
expectedBytes, err := proto.Marshal(msgs[0].Contents)
require.NoError(t, err)
gotBytes, err := proto.Marshal(msg)
require.NoError(t, err)
if len(msgs) > 0 {
if !bytes.Equal(msgs[0].Bytes, msgBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
if !bytes.Equal(expectedBytes, gotBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msg, msgs[0].Counter)
}
return
}
@@ -429,7 +451,10 @@ func TestSwitchStopPeerForError(t *testing.T) {
// send messages to the peer from sw1
p := sw1.Peers().List()[0]
p.Send(0x1, []byte("here's a message to send"))
p.Send(Envelope{
ChannelID: 0x1,
Message: &p2p.Message{},
})
// stop sw2. this should cause the p to fail,
// which results in calling StopPeerForError internally
@@ -824,7 +849,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
// Send random message from foo channel to another
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
successChan := s1.Broadcast(chID, []byte("test data"))
successChan := s1.Broadcast(Envelope{ChannelID: chID})
for s := range successChan {
if s {
numSuccess++
@@ -836,3 +861,16 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
b.Logf("success: %v, failure: %v", numSuccess, numFailure)
}
func TestSwitchRemovalErr(t *testing.T) {
sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
return initSwitchFunc(i, sw)
})
assert.Equal(t, len(sw1.Peers().List()), 1)
p := sw1.Peers().List()[0]
sw2.StopPeerForError(p, fmt.Errorf("peer should error"))
assert.Equal(t, sw2.peers.Add(p).Error(), ErrPeerRemoval{}.Error())
}

View File

@@ -149,6 +149,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
MConnConfig(sw.config),
ni,
sw.reactorsByCh,
sw.msgTypeByChID,
sw.chDescs,
sw.StopPeerForError,
)

View File

@@ -8,6 +8,7 @@ import (
"golang.org/x/net/netutil"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/protoio"
"github.com/tendermint/tendermint/p2p/conn"
@@ -47,9 +48,10 @@ type peerConfig struct {
// isPersistent allows you to set a function, which, given socket address
// (for outbound peers) OR self-reported address (for inbound peers), tells
// if the peer is persistent or not.
isPersistent func(*NetAddress) bool
reactorsByCh map[byte]Reactor
metrics *Metrics
isPersistent func(*NetAddress) bool
reactorsByCh map[byte]Reactor
msgTypeByChID map[byte]proto.Message
metrics *Metrics
}
// Transport emits and connects to Peers. The implementation of Peer is left to
@@ -519,6 +521,7 @@ func (mt *MultiplexTransport) wrapPeer(
mt.mConfig,
ni,
cfg.reactorsByCh,
cfg.msgTypeByChID,
cfg.chDescs,
cfg.onPeerError,
PeerMetrics(cfg.metrics),

View File

@@ -1,8 +1,40 @@
package p2p
import (
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p/conn"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
type ChannelDescriptor = conn.ChannelDescriptor
type ConnectionStatus = conn.ConnectionStatus
// Envelope contains a message with sender routing info.
type Envelope struct {
Src Peer // sender (empty if outbound)
Message proto.Message // message payload
ChannelID byte
}
// Wrapper is a Protobuf message that can contain a variety of inner messages
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
// Router will automatically wrap outbound messages and unwrap inbound messages,
// such that reactors do not have to do this themselves.
type Unwrapper interface {
proto.Message
// Unwrap will unwrap the inner message contained in this message.
Unwrap() (proto.Message, error)
}
type Wrapper interface {
proto.Message
// Wrap will take the underlying message and wrap it in its wrapper type.
Wrap() (proto.Message, error)
}
var (
_ Wrapper = &tmp2p.PexRequest{}
_ Wrapper = &tmp2p.PexAddrs{}
)

View File

@@ -0,0 +1,73 @@
package blocksync
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &StatusRequest{}
var _ p2p.Wrapper = &StatusResponse{}
var _ p2p.Wrapper = &NoBlockResponse{}
var _ p2p.Wrapper = &BlockResponse{}
var _ p2p.Wrapper = &BlockRequest{}
const (
BlockResponseMessagePrefixSize = 4
BlockResponseMessageFieldKeySize = 1
)
func (m *BlockRequest) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_BlockRequest{BlockRequest: m}
return bm, nil
}
func (m *BlockResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_BlockResponse{BlockResponse: m}
return bm, nil
}
func (m *NoBlockResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_NoBlockResponse{NoBlockResponse: m}
return bm, nil
}
func (m *StatusRequest) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_StatusRequest{StatusRequest: m}
return bm, nil
}
func (m *StatusResponse) Wrap() (proto.Message, error) {
bm := &Message{}
bm.Sum = &Message_StatusResponse{StatusResponse: m}
return bm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_BlockRequest:
return m.GetBlockRequest(), nil
case *Message_BlockResponse:
return m.GetBlockResponse(), nil
case *Message_NoBlockResponse:
return m.GetNoBlockResponse(), nil
case *Message_StatusRequest:
return m.GetStatusRequest(), nil
case *Message_StatusResponse:
return m.GetStatusResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -0,0 +1,109 @@
package consensus
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &VoteSetBits{}
var _ p2p.Wrapper = &VoteSetMaj23{}
var _ p2p.Wrapper = &Vote{}
var _ p2p.Wrapper = &ProposalPOL{}
var _ p2p.Wrapper = &Proposal{}
var _ p2p.Wrapper = &NewValidBlock{}
var _ p2p.Wrapper = &NewRoundStep{}
var _ p2p.Wrapper = &HasVote{}
var _ p2p.Wrapper = &BlockPart{}
func (m *VoteSetBits) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_VoteSetBits{VoteSetBits: m}
return cm, nil
}
func (m *VoteSetMaj23) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_VoteSetMaj23{VoteSetMaj23: m}
return cm, nil
}
func (m *HasVote) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_HasVote{HasVote: m}
return cm, nil
}
func (m *Vote) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_Vote{Vote: m}
return cm, nil
}
func (m *BlockPart) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_BlockPart{BlockPart: m}
return cm, nil
}
func (m *ProposalPOL) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_ProposalPol{ProposalPol: m}
return cm, nil
}
func (m *Proposal) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_Proposal{Proposal: m}
return cm, nil
}
func (m *NewValidBlock) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_NewValidBlock{NewValidBlock: m}
return cm, nil
}
func (m *NewRoundStep) Wrap() (proto.Message, error) {
cm := &Message{}
cm.Sum = &Message_NewRoundStep{NewRoundStep: m}
return cm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped consensus
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_NewRoundStep:
return m.GetNewRoundStep(), nil
case *Message_NewValidBlock:
return m.GetNewValidBlock(), nil
case *Message_Proposal:
return m.GetProposal(), nil
case *Message_ProposalPol:
return m.GetProposalPol(), nil
case *Message_BlockPart:
return m.GetBlockPart(), nil
case *Message_Vote:
return m.GetVote(), nil
case *Message_HasVote:
return m.GetHasVote(), nil
case *Message_VoteSetMaj23:
return m.GetVoteSetMaj23(), nil
case *Message_VoteSetBits:
return m.GetVoteSetBits(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -0,0 +1,30 @@
package mempool
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &Txs{}
var _ p2p.Unwrapper = &Message{}
// Wrap implements the p2p Wrapper interface and wraps a mempool message.
func (m *Txs) Wrap() (proto.Message, error) {
return &Message{
Sum: &Message_Txs{Txs: m},
}, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped mempool
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_Txs:
return m.GetTxs(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -0,0 +1,32 @@
package p2p
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
)
func (m *PexAddrs) Wrap() (proto.Message, error) {
pm := &Message{}
pm.Sum = &Message_PexAddrs{PexAddrs: m}
return pm, nil
}
func (m *PexRequest) Wrap() (proto.Message, error) {
pm := &Message{}
pm.Sum = &Message_PexRequest{PexRequest: m}
return pm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped PEX
// message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_PexRequest:
return msg.PexRequest, nil
case *Message_PexAddrs:
return msg.PexAddrs, nil
default:
return nil, fmt.Errorf("unknown pex message: %T", msg)
}
}

View File

@@ -0,0 +1,58 @@
package statesync
import (
"fmt"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/p2p"
)
var _ p2p.Wrapper = &ChunkRequest{}
var _ p2p.Wrapper = &ChunkResponse{}
var _ p2p.Wrapper = &SnapshotsRequest{}
var _ p2p.Wrapper = &SnapshotsResponse{}
func (m *SnapshotsResponse) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_SnapshotsResponse{SnapshotsResponse: m}
return sm, nil
}
func (m *SnapshotsRequest) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_SnapshotsRequest{SnapshotsRequest: m}
return sm, nil
}
func (m *ChunkResponse) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_ChunkResponse{ChunkResponse: m}
return sm, nil
}
func (m *ChunkRequest) Wrap() (proto.Message, error) {
sm := &Message{}
sm.Sum = &Message_ChunkRequest{ChunkRequest: m}
return sm, nil
}
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped state sync
// proto message.
func (m *Message) Unwrap() (proto.Message, error) {
switch msg := m.Sum.(type) {
case *Message_ChunkRequest:
return m.GetChunkRequest(), nil
case *Message_ChunkResponse:
return m.GetChunkResponse(), nil
case *Message_SnapshotsRequest:
return m.GetSnapshotsRequest(), nil
case *Message_SnapshotsResponse:
return m.GetSnapshotsResponse(), nil
default:
return nil, fmt.Errorf("unknown message: %T", msg)
}
}

View File

@@ -131,13 +131,14 @@ object nset = { # the type of node sets
object classic_bft = {
relation quorum_intersection
private {
definition [quorum_intersection_def] quorum_intersection = forall Q1,Q2. exists N. well_behaved(N) & nset.member(N, Q1) & nset.member(N, Q2) # every two quorums have a well-behaved node in common
definition [quorum_intersection_def] quorum_intersection = forall Q1,Q2. nset.is_quorum(Q1) & nset.is_quorum(Q2)
-> exists N. well_behaved(N) & nset.member(N, Q1) & nset.member(N, Q2) # every two quorums have a well-behaved node in common
}
}
trusted isolate accountable_bft = {
# this is our baseline assumption about quorums:
private {
property [max_2f_byzantine] exists N . well_behaved(N) & nset.member(N,Q) # every quorum has a well-behaved member
property [max_2f_byzantine] nset.is_quorum(Q) -> exists N . well_behaved(N) & nset.member(N,Q) # every quorum has a well-behaved member
}
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
// Register the Postgres database driver.
@@ -196,6 +197,55 @@ func TestIndexing(t *testing.T) {
err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
require.NoError(t, err)
})
t.Run("IndexerService", func(t *testing.T) {
indexer := &EventSink{store: testDB(), chainID: chainID}
// event bus
eventBus := types.NewEventBus()
err := eventBus.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := eventBus.Stop(); err != nil {
t.Error(err)
}
})
service := txindex.NewIndexerService(indexer.TxIndexer(), indexer.BlockIndexer(), eventBus, true)
err = service.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := service.Stop(); err != nil {
t.Error(err)
}
})
// publish block with txs
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
NumTxs: int64(2),
})
require.NoError(t, err)
txResult1 := &abci.TxResult{
Height: 1,
Index: uint32(0),
Tx: types.Tx("foo"),
Result: abci.ResponseDeliverTx{Code: 0},
}
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1})
require.NoError(t, err)
txResult2 := &abci.TxResult{
Height: 1,
Index: uint32(1),
Tx: types.Tx("bar"),
Result: abci.ResponseDeliverTx{Code: 1},
}
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2})
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
require.True(t, service.IsRunning())
})
}
func TestStop(t *testing.T) {

View File

@@ -3,7 +3,6 @@ package txindex
import (
"context"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
@@ -20,9 +19,10 @@ const (
type IndexerService struct {
service.BaseService
txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
txIdxr TxIndexer
blockIdxr indexer.BlockIndexer
eventBus *types.EventBus
terminateOnError bool
}
// NewIndexerService returns a new service instance.
@@ -30,9 +30,10 @@ func NewIndexerService(
txIdxr TxIndexer,
blockIdxr indexer.BlockIndexer,
eventBus *types.EventBus,
terminateOnError bool,
) *IndexerService {
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus, terminateOnError: terminateOnError}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
}
@@ -74,24 +75,38 @@ func (is *IndexerService) OnStart() error {
"index", txResult.Index,
"err", err,
)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
}
}
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Info("indexed block", "height", height)
}
batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr)
if err != nil {
is.Logger.Error("deduplicate batch", "height", height)
is.Logger.Info("indexed block exents", "height", height)
}
if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs)
is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs)
}
}
}()
@@ -104,45 +119,3 @@ func (is *IndexerService) OnStop() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}
}
// DeduplicateBatch consider the case of duplicate txs.
// if the current one under investigation is NOT OK, then we need to check
// whether there's a previously indexed tx.
// SKIP the current tx if the previously indexed record is found and successful.
func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) {
result := make([]*abci.TxResult, 0, len(ops))
// keep track of successful txs in this block in order to suppress latter ones being indexed.
var successfulTxsInThisBlock = make(map[string]struct{})
for _, txResult := range ops {
hash := types.Tx(txResult.Tx).Hash()
if txResult.Result.IsOK() {
successfulTxsInThisBlock[string(hash)] = struct{}{}
} else {
// if it already appeared in current block and was successful, skip.
if _, found := successfulTxsInThisBlock[string(hash)]; found {
continue
}
// check if this tx hash is already indexed
old, err := txIdxr.Get(hash)
// if db op errored
// Not found is not an error
if err != nil {
return nil, err
}
// if it's already indexed in an older block and was successful, skip.
if old != nil && old.Result.Code == abci.CodeTypeOK {
continue
}
}
result = append(result, txResult)
}
return result, nil
}

View File

@@ -32,7 +32,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
txIndexer := kv.NewTxIndex(store)
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
service.SetLogger(log.TestingLogger())
err = service.Start()
require.NoError(t, err)
@@ -79,164 +79,3 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
require.NoError(t, err)
require.Equal(t, txResult2, res)
}
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")
testCases := []struct {
name string
tx1 abci.TxResult
tx2 abci.TxResult
expSkip bool // do we expect the second tx to be skipped by tx indexer
}{
{"skip, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"not skip, both successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
{"not skip, both unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{"skip, same block, previously successful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{"not skip, same block, previously unsuccessful",
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := kv.NewTxIndex(db.NewMemDB())
if tc.tx1.Height != tc.tx2.Height {
// index the first tx
err := indexer.AddBatch(&txindex.Batch{
Ops: []*abci.TxResult{&tc.tx1},
})
require.NoError(t, err)
// check if the second one should be skipped.
ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer)
require.NoError(t, err)
if tc.expSkip {
require.Empty(t, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
}
} else {
// same block
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
ops, err := txindex.DeduplicateBatch(ops, indexer)
require.NoError(t, err)
if tc.expSkip {
// the second one is skipped
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
} else {
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
}
}
})
}
}

View File

@@ -102,12 +102,30 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
// that indexed from the tx's events is a composite of the event type and the
// respective attribute's key delimited by a "." (eg. "account.number").
// Any event with an empty type is not indexed.
//
// If a transaction is indexed with the same hash as a previous transaction, it will
// be overwritten unless the tx result was NOT OK and the prior result was OK i.e.
// more transactions that successfully executed overwrite transactions that failed
// or successful yet older transactions.
func (txi *TxIndex) Index(result *abci.TxResult) error {
b := txi.store.NewBatch()
defer b.Close()
hash := types.Tx(result.Tx).Hash()
if !result.Result.IsOK() {
oldResult, err := txi.Get(hash)
if err != nil {
return err
}
// if the new transaction failed and it's already indexed in an older block and was successful
// we skip it as we want users to get the older successful transaction when they query.
if oldResult != nil && oldResult.Result.Code == abci.CodeTypeOK {
return nil
}
}
// index tx by events
err := txi.indexEvents(result, hash, b)
if err != nil {

View File

@@ -258,6 +258,103 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
}
}
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
var mockTx = types.Tx("MOCK_TX_HASH")
testCases := []struct {
name string
tx1 *abci.TxResult
tx2 *abci.TxResult
expOverwrite bool // do we expect the second tx to overwrite the first tx
}{
{
"don't overwrite as a non-zero code was returned and the previous tx was successful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
false,
},
{
"overwrite as the previous tx was also unsuccessful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK + 1,
},
},
true,
},
{
"overwrite as the most recent tx was successful",
&abci.TxResult{
Height: 1,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
&abci.TxResult{
Height: 2,
Index: 0,
Tx: mockTx,
Result: abci.ResponseDeliverTx{
Code: abci.CodeTypeOK,
},
},
true,
},
}
hash := mockTx.Hash()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB())
// index the first tx
err := indexer.Index(tc.tx1)
require.NoError(t, err)
// index the same tx with different results
err = indexer.Index(tc.tx2)
require.NoError(t, err)
res, err := indexer.Get(hash)
require.NoError(t, err)
if tc.expOverwrite {
require.Equal(t, tc.tx2, res)
} else {
require.Equal(t, tc.tx1, res)
}
})
}
}
func TestTxSearchMultipleTxs(t *testing.T) {
indexer := NewTxIndex(db.NewMemDB())

View File

@@ -16,49 +16,6 @@ const (
chunkMsgSize = int(16e6)
)
// mustEncodeMsg encodes a Protobuf message, panicing on error.
func mustEncodeMsg(pb proto.Message) []byte {
msg := ssproto.Message{}
switch pb := pb.(type) {
case *ssproto.ChunkRequest:
msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb}
case *ssproto.ChunkResponse:
msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb}
case *ssproto.SnapshotsRequest:
msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb}
case *ssproto.SnapshotsResponse:
msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb}
default:
panic(fmt.Errorf("unknown message type %T", pb))
}
bz, err := msg.Marshal()
if err != nil {
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
}
return bz
}
// decodeMsg decodes a Protobuf message.
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &ssproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *ssproto.Message_ChunkRequest:
return msg.ChunkRequest, nil
case *ssproto.Message_ChunkResponse:
return msg.ChunkResponse, nil
case *ssproto.Message_SnapshotsRequest:
return msg.SnapshotsRequest, nil
case *ssproto.Message_SnapshotsResponse:
return msg.SnapshotsResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// validateMsg validates a message.
func validateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -7,6 +7,7 @@ import (
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/p2p"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)
@@ -99,8 +100,10 @@ func TestStateSyncVectors(t *testing.T) {
for _, tc := range testCases {
tc := tc
bz := mustEncodeMsg(tc.msg)
w, err := tc.msg.(p2p.Wrapper).Wrap()
require.NoError(t, err)
bz, err := proto.Marshal(w)
require.NoError(t, err)
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
}

View File

@@ -66,12 +66,14 @@ func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
Priority: 5,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
MessageType: &ssproto.Message{},
},
{
ID: ChunkChannel,
Priority: 3,
SendQueueCapacity: 10,
RecvMessageCapacity: chunkMsgSize,
MessageType: &ssproto.Message{},
},
}
}
@@ -100,27 +102,21 @@ func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
}
// Receive implements p2p.Reactor.
func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
func (r *Reactor) Receive(e p2p.Envelope) {
if !r.IsRunning() {
return
}
msg, err := decodeMsg(msgBytes)
err := validateMsg(e.Message)
if err != nil {
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
r.Switch.StopPeerForError(src, err)
return
}
err = validateMsg(msg)
if err != nil {
r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err)
r.Switch.StopPeerForError(src, err)
r.Logger.Error("Invalid message", "peer", e.Src, "msg", e.Message, "err", err)
r.Switch.StopPeerForError(e.Src, err)
return
}
switch chID {
switch e.ChannelID {
case SnapshotChannel:
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *ssproto.SnapshotsRequest:
snapshots, err := r.recentSnapshots(recentSnapshots)
if err != nil {
@@ -129,14 +125,17 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
for _, snapshot := range snapshots {
r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
"format", snapshot.Format, "peer", src.ID())
src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
}))
"format", snapshot.Format, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
ChannelID: e.ChannelID,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
},
})
}
case *ssproto.SnapshotsResponse:
@@ -146,8 +145,8 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
r.Logger.Debug("Received unexpected snapshot, no state sync in progress")
return
}
r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID())
_, err := r.syncer.AddSnapshot(src, &snapshot{
r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", e.Src.ID())
_, err := r.syncer.AddSnapshot(e.Src, &snapshot{
Height: msg.Height,
Format: msg.Format,
Chunks: msg.Chunks,
@@ -157,7 +156,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
// TODO: We may want to consider punishing the peer for certain errors
if err != nil {
r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format,
"peer", src.ID(), "err", err)
"peer", e.Src.ID(), "err", err)
return
}
@@ -166,10 +165,10 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
case ChunkChannel:
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *ssproto.ChunkRequest:
r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", src.ID())
"chunk", msg.Index, "peer", e.Src.ID())
resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{
Height: msg.Height,
Format: msg.Format,
@@ -181,30 +180,33 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
}
r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", src.ID())
src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
}))
"chunk", msg.Index, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
},
})
case *ssproto.ChunkResponse:
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.syncer == nil {
r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID())
r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", e.Src.ID())
return
}
r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", src.ID())
"chunk", msg.Index, "peer", e.Src.ID())
_, err := r.syncer.AddChunk(&chunk{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: msg.Chunk,
Sender: src.ID(),
Sender: e.Src.ID(),
})
if err != nil {
r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format,
@@ -217,7 +219,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
default:
r.Logger.Error("Received message on invalid channel %x", chID)
r.Logger.Error("Received message on invalid channel %x", e.ChannelID)
}
}
@@ -269,7 +271,11 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
hook := func() {
r.Logger.Debug("Requesting snapshots from known peers")
// Request snapshots from all currently connected peers
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
r.Switch.NewBroadcast(p2p.Envelope{
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
})
}
hook()

View File

@@ -4,6 +4,7 @@ import (
"testing"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@@ -53,10 +54,18 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
peer.On("ID").Return(p2p.ID("id"))
var response *ssproto.ChunkResponse
if tc.expectResponse != nil {
peer.On("Send", ChunkChannel, mock.Anything).Run(func(args mock.Arguments) {
msg, err := decodeMsg(args[1].([]byte))
peer.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == ChunkChannel
})).Run(func(args mock.Arguments) {
e := args[0].(p2p.Envelope)
// Marshal to simulate a wire roundtrip.
bz, err := proto.Marshal(e.Message)
require.NoError(t, err)
response = msg.(*ssproto.ChunkResponse)
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
response = e.Message.(*ssproto.ChunkResponse)
}).Return(true)
}
@@ -71,7 +80,11 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
}
})
r.Receive(ChunkChannel, peer, mustEncodeMsg(tc.request))
r.Receive(p2p.Envelope{
ChannelID: ChunkChannel,
Src: peer,
Message: tc.request,
})
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tc.expectResponse, response)
@@ -131,10 +144,18 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
peer := &p2pmocks.Peer{}
if len(tc.expectResponses) > 0 {
peer.On("ID").Return(p2p.ID("id"))
peer.On("Send", SnapshotChannel, mock.Anything).Run(func(args mock.Arguments) {
msg, err := decodeMsg(args[1].([]byte))
peer.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == SnapshotChannel
})).Run(func(args mock.Arguments) {
e := args[0].(p2p.Envelope)
// Marshal to simulate a wire roundtrip.
bz, err := proto.Marshal(e.Message)
require.NoError(t, err)
responses = append(responses, msg.(*ssproto.SnapshotsResponse))
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
responses = append(responses, e.Message.(*ssproto.SnapshotsResponse))
}).Return(true)
}
@@ -149,7 +170,11 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
}
})
r.Receive(SnapshotChannel, peer, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
r.Receive(p2p.Envelope{
ChannelID: SnapshotChannel,
Src: peer,
Message: &ssproto.SnapshotsRequest{},
})
time.Sleep(100 * time.Millisecond)
assert.Equal(t, tc.expectResponses, responses)

View File

@@ -126,7 +126,11 @@ func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) {
// to discover snapshots, later we may want to do retries and stuff.
func (s *syncer) AddPeer(peer p2p.Peer) {
s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
e := p2p.Envelope{
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
}
peer.Send(e)
}
// RemovePeer removes a peer from the pool.
@@ -467,11 +471,14 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
}
s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
}))
peer.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
},
})
}
// verifyApp verifies the sync, checking the app hash, last block height and app version

View File

@@ -98,13 +98,27 @@ func TestSyncer_SyncAny(t *testing.T) {
// Adding a couple of peers should trigger snapshot discovery messages
peerA := &p2pmocks.Peer{}
peerA.On("ID").Return(p2p.ID("a"))
peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
if !ok {
return false
}
req, ok := e.Message.(*ssproto.SnapshotsRequest)
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
syncer.AddPeer(peerA)
peerA.AssertExpectations(t)
peerB := &p2pmocks.Peer{}
peerB.On("ID").Return(p2p.ID("b"))
peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
if !ok {
return false
}
req, ok := e.Message.(*ssproto.SnapshotsRequest)
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
syncer.AddPeer(peerB)
peerB.AssertExpectations(t)
@@ -147,9 +161,9 @@ func TestSyncer_SyncAny(t *testing.T) {
chunkRequests := make(map[uint32]int)
chunkRequestsMtx := tmsync.Mutex{}
onChunkRequest := func(args mock.Arguments) {
pb, err := decodeMsg(args[1].([]byte))
require.NoError(t, err)
msg := pb.(*ssproto.ChunkRequest)
e, ok := args[0].(p2p.Envelope)
require.True(t, ok)
msg := e.Message.(*ssproto.ChunkRequest)
require.EqualValues(t, 1, msg.Height)
require.EqualValues(t, 1, msg.Format)
require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
@@ -162,8 +176,14 @@ func TestSyncer_SyncAny(t *testing.T) {
chunkRequests[msg.Index]++
chunkRequestsMtx.Unlock()
}
peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == ChunkChannel
})).Maybe().Run(onChunkRequest).Return(true)
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == ChunkChannel
})).Maybe().Run(onChunkRequest).Return(true)
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from

View File

@@ -87,7 +87,7 @@ func toCSVRecords(rs []report.Report) [][]string {
}
res := make([][]string, total+1)
res[0] = []string{"experiment_id", "duration_ns", "block_time", "connections", "rate", "size"}
res[0] = []string{"experiment_id", "block_time", "duration_ns", "tx_hash", "connections", "rate", "size"}
offset := 1
for _, r := range rs {
idStr := r.ID.String()
@@ -95,7 +95,7 @@ func toCSVRecords(rs []report.Report) [][]string {
rateStr := strconv.FormatInt(int64(r.Rate), 10)
sizeStr := strconv.FormatInt(int64(r.Size), 10)
for i, v := range r.All {
res[offset+i] = []string{idStr, strconv.FormatInt(int64(v.Duration), 10), strconv.FormatInt(v.BlockTime.UnixNano(), 10), connStr, rateStr, sizeStr}
res[offset+i] = []string{idStr, strconv.FormatInt(v.BlockTime.UnixNano(), 10), strconv.FormatInt(int64(v.Duration), 10), fmt.Sprintf("%X", v.Hash), connStr, rateStr, sizeStr}
}
offset += len(r.All)
}

View File

@@ -3,7 +3,7 @@ package payload
import (
"bytes"
"crypto/rand"
"errors"
"encoding/hex"
"fmt"
"math"
@@ -12,6 +12,7 @@ import (
)
const keyPrefix = "a="
const maxPayloadSize = 4 * 1024 * 1024
// NewBytes generates a new payload and returns the encoded representation of
// the payload as a slice of bytes. NewBytes uses the fields on the Options
@@ -25,10 +26,16 @@ func NewBytes(p *Payload) ([]byte, error) {
if err != nil {
return nil, err
}
if p.Size < uint64(us) {
return nil, fmt.Errorf("configured size %d not large enough to fit unpadded transaction of size %d", p.Size, us)
if p.Size > maxPayloadSize {
return nil, fmt.Errorf("configured size %d is too large (>%d)", p.Size, maxPayloadSize)
}
p.Padding = make([]byte, p.Size-uint64(us))
pSize := int(p.Size) // #nosec -- The "if" above makes this cast safe
if pSize < us {
return nil, fmt.Errorf("configured size %d not large enough to fit unpadded transaction of size %d", pSize, us)
}
// We halve the padding size because we transform the TX to hex
p.Padding = make([]byte, (pSize-us)/2)
_, err = rand.Read(p.Padding)
if err != nil {
return nil, err
@@ -37,22 +44,28 @@ func NewBytes(p *Payload) ([]byte, error) {
if err != nil {
return nil, err
}
h := []byte(hex.EncodeToString(b))
// prepend a single key so that the kv store only ever stores a single
// transaction instead of storing all tx and ballooning in size.
return append([]byte(keyPrefix), b...), nil
return append([]byte(keyPrefix), h...), nil
}
// FromBytes extracts a paylod from the byte representation of the payload.
// FromBytes leaves the padding untouched, returning it to the caller to handle
// or discard per their preference.
func FromBytes(b []byte) (*Payload, error) {
p := &Payload{}
tr := bytes.TrimPrefix(b, []byte(keyPrefix))
if bytes.Equal(b, tr) {
return nil, errors.New("payload bytes missing key prefix")
trH := bytes.TrimPrefix(b, []byte(keyPrefix))
if bytes.Equal(b, trH) {
return nil, fmt.Errorf("payload bytes missing key prefix '%s'", keyPrefix)
}
err := proto.Unmarshal(tr, p)
trB, err := hex.DecodeString(string(trH))
if err != nil {
return nil, err
}
p := &Payload{}
err = proto.Unmarshal(trB, p)
if err != nil {
return nil, err
}
@@ -83,5 +96,6 @@ func CalculateUnpaddedSize(p *Payload) (int, error) {
if err != nil {
return 0, err
}
return len(b) + len(keyPrefix), nil
h := []byte(hex.EncodeToString(b))
return len(h) + len(keyPrefix), nil
}

View File

@@ -25,6 +25,7 @@ type BlockStore interface {
type DataPoint struct {
Duration time.Duration
BlockTime time.Time
Hash []byte
}
// Report contains the data calculated from reading the timestamped transactions
@@ -68,7 +69,7 @@ func (rs *Reports) ErrorCount() int {
return rs.errorCount
}
func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, conns, rate, size uint64) {
func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, hash []byte, conns, rate, size uint64) {
r, ok := rs.s[id]
if !ok {
r = Report{
@@ -81,7 +82,7 @@ func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, con
}
rs.s[id] = r
}
r.All = append(r.All, DataPoint{Duration: l, BlockTime: bt})
r.All = append(r.All, DataPoint{Duration: l, BlockTime: bt, Hash: hash})
if l > r.Max {
r.Max = l
}
@@ -123,11 +124,12 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
id uuid.UUID
l time.Duration
bt time.Time
hash []byte
connections, rate, size uint64
err error
}
type txData struct {
tx []byte
tx types.Tx
bt time.Time
}
reports := &Reports{
@@ -161,6 +163,7 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
pdc <- payloadData{
l: l,
bt: b.bt,
hash: b.tx.Hash(),
id: uuid.UUID(*idb),
connections: p.Connections,
rate: p.Rate,
@@ -202,7 +205,7 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
reports.addError()
continue
}
reports.addDataPoint(pd.id, pd.l, pd.bt, pd.connections, pd.rate, pd.size)
reports.addDataPoint(pd.id, pd.l, pd.bt, pd.hash, pd.connections, pd.rate, pd.size)
}
reports.calculateAll()
return reports, nil