mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-12 15:52:50 +00:00
Compare commits
38 Commits
sam/abci++
...
wb/impleme
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ab868180c | ||
|
|
27a64acc9a | ||
|
|
34ae6ae9b6 | ||
|
|
e3e7e9ec9f | ||
|
|
5b86c5562a | ||
|
|
36decbb4c8 | ||
|
|
2f7844b0b9 | ||
|
|
4ae93fb379 | ||
|
|
ca2dcac7f6 | ||
|
|
9e1c7bdfb8 | ||
|
|
4b5373b2a5 | ||
|
|
9cad0c1b33 | ||
|
|
5c173cbb3d | ||
|
|
9742dac312 | ||
|
|
42fc1fc1cc | ||
|
|
3c6e81ad15 | ||
|
|
3ab55dcffc | ||
|
|
11fe2c206d | ||
|
|
ade3cee699 | ||
|
|
8015f6e254 | ||
|
|
24c1f44085 | ||
|
|
27db33dbda | ||
|
|
61834bb5fc | ||
|
|
2b7b2f2012 | ||
|
|
9b2c2ee3af | ||
|
|
b11060153e | ||
|
|
a105d2d0ea | ||
|
|
4d78096843 | ||
|
|
b42c439776 | ||
|
|
387bf6795a | ||
|
|
4f3e87b2e4 | ||
|
|
a371b1e3a8 | ||
|
|
9dd99e9294 | ||
|
|
4fd19a275e | ||
|
|
8d26460f9d | ||
|
|
c0bdb2423a | ||
|
|
cdd3479f20 | ||
|
|
b1dc5a6def |
@@ -96,3 +96,4 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
|
||||
- [consensus] \#9229 fix round number of `enterPropose` when handling `RoundStepNewRound` timeout. (@fatcat22)
|
||||
- [docker] \#9073 enable cross platform build using docker buildx
|
||||
- [blocksync] \#9518 handle the case when the sending queue is full: retry block request after a timeout
|
||||
@@ -12,7 +12,7 @@ and hence to Tendermint.
|
||||
* We are committed to providing a friendly, safe and welcoming environment for
|
||||
all, regardless of level of experience, gender, gender identity and
|
||||
expression, sexual orientation, disability, personal appearance, body size,
|
||||
race, ethnicity, age, religion, nationality, or other similar characteristic.
|
||||
race, ethnicity, age, religion, nationality, or other similar characteristics.
|
||||
|
||||
* On Slack, please avoid using overtly sexual nicknames or other nicknames that
|
||||
might detract from a friendly, safe and welcoming environment for all.
|
||||
|
||||
@@ -12,7 +12,7 @@ landing changes in `main`.
|
||||
All work on the code base should be motivated by a [Github
|
||||
Issue](https://github.com/tendermint/tendermint/issues).
|
||||
[Search](https://github.com/tendermint/tendermint/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22)
|
||||
is a good place start when looking for places to contribute. If you
|
||||
is a good place to start when looking for places to contribute. If you
|
||||
would like to work on an issue which already exists, please indicate so
|
||||
by leaving a comment.
|
||||
|
||||
@@ -213,7 +213,7 @@ Changes with multiple classifications should be doubly included (eg. a bug fix
|
||||
that is also a breaking change should be recorded under both).
|
||||
|
||||
Breaking changes are further subdivided according to the APIs/users they impact.
|
||||
Any change that effects multiple APIs/users should be recorded multiply - for
|
||||
Any change that affects multiple APIs/users should be recorded multiply - for
|
||||
instance, a change to the `Blockchain Protocol` that removes a field from the
|
||||
header should also be recorded under `CLI/RPC/Config` since the field will be
|
||||
removed from the header in RPC responses as well.
|
||||
@@ -247,7 +247,7 @@ To begin contributing, create a development branch either on `github.com/tenderm
|
||||
Make changes, and before submitting a pull request, update the `CHANGELOG_PENDING.md` to record your change. Also, run either `git rebase` or `git merge` on top of the latest `main`. (Since pull requests are squash-merged, either is fine!)
|
||||
|
||||
Update the `UPGRADING.md` if the change you've made is breaking and the
|
||||
instructions should be in place for a user on how he/she can upgrade it's
|
||||
instructions should be in place for a user on how he/she can upgrade its
|
||||
software (ABCI application, Tendermint-based blockchain, light client, wallet).
|
||||
|
||||
Once you have submitted a pull request label the pull request with either `R:minor`, if the change should be included in the next minor release, or `R:major`, if the change is meant for a major release.
|
||||
|
||||
@@ -19,58 +19,6 @@ const (
|
||||
BlockResponseMessageFieldKeySize
|
||||
)
|
||||
|
||||
// EncodeMsg encodes a Protobuf message
|
||||
func EncodeMsg(pb proto.Message) ([]byte, error) {
|
||||
msg := bcproto.Message{}
|
||||
|
||||
switch pb := pb.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
|
||||
case *bcproto.BlockResponse:
|
||||
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
|
||||
case *bcproto.NoBlockResponse:
|
||||
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
|
||||
case *bcproto.StatusRequest:
|
||||
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
|
||||
case *bcproto.StatusResponse:
|
||||
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message type %T", pb)
|
||||
}
|
||||
|
||||
bz, err := proto.Marshal(&msg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
|
||||
}
|
||||
|
||||
return bz, nil
|
||||
}
|
||||
|
||||
// DecodeMsg decodes a Protobuf message.
|
||||
func DecodeMsg(bz []byte) (proto.Message, error) {
|
||||
pb := &bcproto.Message{}
|
||||
|
||||
err := proto.Unmarshal(bz, pb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch msg := pb.Sum.(type) {
|
||||
case *bcproto.Message_BlockRequest:
|
||||
return msg.BlockRequest, nil
|
||||
case *bcproto.Message_BlockResponse:
|
||||
return msg.BlockResponse, nil
|
||||
case *bcproto.Message_NoBlockResponse:
|
||||
return msg.NoBlockResponse, nil
|
||||
case *bcproto.Message_StatusRequest:
|
||||
return msg.StatusRequest, nil
|
||||
case *bcproto.Message_StatusResponse:
|
||||
return msg.StatusResponse, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message type %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
// ValidateMsg validates a message.
|
||||
func ValidateMsg(pb proto.Message) error {
|
||||
if pb == nil {
|
||||
|
||||
@@ -80,7 +80,7 @@ func TestBcStatusResponseMessageValidateBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
//nolint:lll // ignore line length in tests
|
||||
func TestBlockchainMessageVectors(t *testing.T) {
|
||||
func TestBlocksyncMessageVectors(t *testing.T) {
|
||||
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil)
|
||||
block.Version.Block = 11 // overwrite updated protocol version
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ const (
|
||||
maxTotalRequesters = 600
|
||||
maxPendingRequests = maxTotalRequesters
|
||||
maxPendingRequestsPerPeer = 20
|
||||
requestRetrySeconds = 30
|
||||
|
||||
// Minimum recv rate to ensure we're receiving blocks from a peer fast
|
||||
// enough. If a peer is not sending us data at at least that rate, we
|
||||
@@ -602,7 +603,7 @@ OUTER_LOOP:
|
||||
}
|
||||
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
|
||||
if peer == nil {
|
||||
// log.Info("No peers available", "height", height)
|
||||
bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height)
|
||||
time.Sleep(requestIntervalMS * time.Millisecond)
|
||||
continue PICK_PEER_LOOP
|
||||
}
|
||||
@@ -612,6 +613,7 @@ OUTER_LOOP:
|
||||
bpr.peerID = peer.id
|
||||
bpr.mtx.Unlock()
|
||||
|
||||
to := time.NewTimer(requestRetrySeconds * time.Second)
|
||||
// Send request and wait.
|
||||
bpr.pool.sendRequest(bpr.height, peer.id)
|
||||
WAIT_LOOP:
|
||||
@@ -624,6 +626,11 @@ OUTER_LOOP:
|
||||
return
|
||||
case <-bpr.Quit():
|
||||
return
|
||||
case <-to.C:
|
||||
bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID)
|
||||
// Simulate a redo
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP
|
||||
case peerID := <-bpr.redoCh:
|
||||
if peerID == bpr.peerID {
|
||||
bpr.reset()
|
||||
|
||||
@@ -30,7 +30,7 @@ const (
|
||||
)
|
||||
|
||||
type consensusReactor interface {
|
||||
// for when we switch from blockchain reactor and block sync to
|
||||
// for when we switch from blocksync reactor and block sync to
|
||||
// the consensus machine
|
||||
SwitchToConsensus(state sm.State, skipWAL bool)
|
||||
}
|
||||
@@ -143,21 +143,20 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
SendQueueCapacity: 1000,
|
||||
RecvBufferCapacity: 50 * 4096,
|
||||
RecvMessageCapacity: MaxMsgSize,
|
||||
MessageType: &bcproto.Message{},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// AddPeer implements Reactor by sending our state to peer.
|
||||
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
|
||||
Base: bcR.store.Base(),
|
||||
Height: bcR.store.Height()})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
peer.Send(BlocksyncChannel, msgBytes)
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Base: bcR.store.Base(),
|
||||
Height: bcR.store.Height(),
|
||||
},
|
||||
})
|
||||
// it's OK if send fails. will try later in poolRoutine
|
||||
|
||||
// peer is added to the pool once we receive the first
|
||||
@@ -182,69 +181,53 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
|
||||
return false
|
||||
}
|
||||
|
||||
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not marshal msg", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return src.TrySend(BlocksyncChannel, msgBytes)
|
||||
return src.TrySend(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.BlockResponse{Block: bl},
|
||||
})
|
||||
}
|
||||
|
||||
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
|
||||
|
||||
msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return src.TrySend(BlocksyncChannel, msgBytes)
|
||||
return src.TrySend(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.NoBlockResponse{Height: msg.Height},
|
||||
})
|
||||
}
|
||||
|
||||
// Receive implements Reactor by handling 4 types of messages (look below).
|
||||
func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
msg, err := DecodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
bcR.Switch.StopPeerForError(src, err)
|
||||
func (bcR *Reactor) Receive(e p2p.Envelope) {
|
||||
if err := ValidateMsg(e.Message); err != nil {
|
||||
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
|
||||
bcR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = ValidateMsg(msg); err != nil {
|
||||
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
|
||||
bcR.Switch.StopPeerForError(src, err)
|
||||
return
|
||||
}
|
||||
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)
|
||||
|
||||
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
|
||||
|
||||
switch msg := msg.(type) {
|
||||
switch msg := e.Message.(type) {
|
||||
case *bcproto.BlockRequest:
|
||||
bcR.respondToPeer(msg, src)
|
||||
bcR.respondToPeer(msg, e.Src)
|
||||
case *bcproto.BlockResponse:
|
||||
bi, err := types.BlockFromProto(msg.Block)
|
||||
if err != nil {
|
||||
bcR.Logger.Error("Block content is invalid", "err", err)
|
||||
return
|
||||
}
|
||||
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
|
||||
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
|
||||
case *bcproto.StatusRequest:
|
||||
// Send peer our state.
|
||||
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
|
||||
Height: bcR.store.Height(),
|
||||
Base: bcR.store.Base(),
|
||||
e.Src.TrySend(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Height: bcR.store.Height(),
|
||||
Base: bcR.store.Base(),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to protobut", "err", err)
|
||||
return
|
||||
}
|
||||
src.TrySend(BlocksyncChannel, msgBytes)
|
||||
case *bcproto.StatusResponse:
|
||||
// Got a peer status. Unverified.
|
||||
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
|
||||
bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height)
|
||||
case *bcproto.NoBlockResponse:
|
||||
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
|
||||
bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height)
|
||||
default:
|
||||
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
||||
}
|
||||
@@ -285,13 +268,10 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
|
||||
if peer == nil {
|
||||
continue
|
||||
}
|
||||
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to proto", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
queued := peer.TrySend(BlocksyncChannel, msgBytes)
|
||||
queued := peer.TrySend(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.BlockRequest{Height: request.Height},
|
||||
})
|
||||
if !queued {
|
||||
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
|
||||
}
|
||||
@@ -430,13 +410,9 @@ FOR_LOOP:
|
||||
|
||||
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
|
||||
func (bcR *Reactor) BroadcastStatusRequest() error {
|
||||
bm, err := EncodeMsg(&bcproto.StatusRequest{})
|
||||
if err != nil {
|
||||
bcR.Logger.Error("could not convert msg to proto", "err", err)
|
||||
return fmt.Errorf("could not convert msg to proto: %w", err)
|
||||
}
|
||||
|
||||
bcR.Switch.Broadcast(BlocksyncChannel, bm)
|
||||
|
||||
bcR.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: BlocksyncChannel,
|
||||
Message: &bcproto.StatusRequest{},
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -146,13 +146,13 @@ func newReactor(
|
||||
}
|
||||
|
||||
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
||||
bcReactor.SetLogger(logger.With("module", "blocksync"))
|
||||
|
||||
return ReactorPair{bcReactor, proxyApp}
|
||||
}
|
||||
|
||||
func TestNoBlockResponse(t *testing.T) {
|
||||
config = test.ResetTestRoot("blockchain_reactor_test")
|
||||
config = test.ResetTestRoot("blocksync_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||
|
||||
@@ -164,7 +164,7 @@ func TestNoBlockResponse(t *testing.T) {
|
||||
reactorPairs[1] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
|
||||
|
||||
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
||||
s.AddReactor("BLOCKSYNC", reactorPairs[i].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
@@ -214,7 +214,7 @@ func TestNoBlockResponse(t *testing.T) {
|
||||
// Alternatively we could actually dial a TCP conn but
|
||||
// that seems extreme.
|
||||
func TestBadBlockStopsPeer(t *testing.T) {
|
||||
config = test.ResetTestRoot("blockchain_reactor_test")
|
||||
config = test.ResetTestRoot("blocksync_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||
|
||||
@@ -239,7 +239,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
|
||||
reactorPairs[3] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
|
||||
|
||||
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
||||
s.AddReactor("BLOCKSYNC", reactorPairs[i].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)
|
||||
@@ -278,7 +278,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
|
||||
reactorPairs = append(reactorPairs, lastReactorPair)
|
||||
|
||||
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
|
||||
s.AddReactor("BLOCKSYNC", reactorPairs[len(reactorPairs)-1].reactor)
|
||||
return s
|
||||
|
||||
}, p2p.Connect2Switches)...)
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
|
||||
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
@@ -165,10 +166,16 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
for i, peer := range peerList {
|
||||
if i < len(peerList)/2 {
|
||||
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
|
||||
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
|
||||
peer.Send(p2p.Envelope{
|
||||
Message: &tmcons.Vote{prevote1.ToProto()},
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
} else {
|
||||
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
|
||||
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
|
||||
peer.Send(p2p.Envelope{
|
||||
Message: &tmcons.Vote{prevote2.ToProto()},
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -520,18 +527,26 @@ func sendProposalAndParts(
|
||||
parts *types.PartSet,
|
||||
) {
|
||||
// proposal
|
||||
msg := &ProposalMessage{Proposal: proposal}
|
||||
peer.Send(DataChannel, MustEncode(msg))
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
|
||||
})
|
||||
|
||||
// parts
|
||||
for i := 0; i < int(parts.Total()); i++ {
|
||||
part := parts.GetPart(i)
|
||||
msg := &BlockPartMessage{
|
||||
Height: height, // This tells peer that this part applies to us.
|
||||
Round: round, // This tells peer that this part applies to us.
|
||||
Part: part,
|
||||
pp, err := part.ToProto()
|
||||
if err != nil {
|
||||
panic(err) // TODO: wbanfield better error handling
|
||||
}
|
||||
peer.Send(DataChannel, MustEncode(msg))
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: height, // This tells peer that this part applies to us.
|
||||
Round: round, // This tells peer that this part applies to us.
|
||||
Part: *pp,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// votes
|
||||
@@ -539,9 +554,14 @@ func sendProposalAndParts(
|
||||
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
|
||||
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
|
||||
cs.mtx.Unlock()
|
||||
|
||||
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
|
||||
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{prevote.ToProto()},
|
||||
})
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{precommit.ToProto()},
|
||||
})
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
@@ -579,7 +599,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
|
||||
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
br.reactor.RemovePeer(peer, reason)
|
||||
}
|
||||
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
br.reactor.Receive(chID, peer, msgBytes)
|
||||
func (br *ByzantineReactor) Receive(e p2p.Envelope) {
|
||||
br.reactor.Receive(e)
|
||||
}
|
||||
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -94,7 +95,10 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
|
||||
peers := sw.Peers().List()
|
||||
for _, peer := range peers {
|
||||
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
|
||||
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
|
||||
peer.Send(p2p.Envelope{
|
||||
Message: &tmcons.Vote{precommit.ToProto()},
|
||||
ChannelID: VoteChannel,
|
||||
})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -4,8 +4,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/libs/bits"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
@@ -15,7 +13,9 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// MsgToProto takes a consensus message type and returns the proto defined consensus message
|
||||
// MsgToProto takes a consensus message type and returns the proto defined consensus message.
|
||||
//
|
||||
// TODO: This needs to be removed, but WALToProto depends on this.
|
||||
func MsgToProto(msg Message) (*tmcons.Message, error) {
|
||||
if msg == nil {
|
||||
return nil, errors.New("consensus: message is nil")
|
||||
@@ -260,20 +260,6 @@ func MsgFromProto(msg *tmcons.Message) (Message, error) {
|
||||
return pb, nil
|
||||
}
|
||||
|
||||
// MustEncode takes the reactors msg, makes it proto and marshals it
|
||||
// this mimics `MustMarshalBinaryBare` in that is panics on error
|
||||
func MustEncode(msg Message) []byte {
|
||||
pb, err := MsgToProto(msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
enc, err := proto.Marshal(pb)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return enc
|
||||
}
|
||||
|
||||
// WALToProto takes a WAL message and return a proto walMessage and error
|
||||
func WALToProto(msg WALMessage) (*tmcons.WALMessage, error) {
|
||||
var pb tmcons.WALMessage
|
||||
|
||||
@@ -7,8 +7,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/libs/bits"
|
||||
tmevents "github.com/tendermint/tendermint/libs/events"
|
||||
@@ -148,6 +146,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
Priority: 6,
|
||||
SendQueueCapacity: 100,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
MessageType: &tmcons.Message{},
|
||||
},
|
||||
{
|
||||
ID: DataChannel, // maybe split between gossiping current block and catchup stuff
|
||||
@@ -156,6 +155,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
SendQueueCapacity: 100,
|
||||
RecvBufferCapacity: 50 * 4096,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
MessageType: &tmcons.Message{},
|
||||
},
|
||||
{
|
||||
ID: VoteChannel,
|
||||
@@ -163,6 +163,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
SendQueueCapacity: 100,
|
||||
RecvBufferCapacity: 100 * 100,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
MessageType: &tmcons.Message{},
|
||||
},
|
||||
{
|
||||
ID: VoteSetBitsChannel,
|
||||
@@ -170,6 +171,7 @@ func (conR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
SendQueueCapacity: 2,
|
||||
RecvBufferCapacity: 1024,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
MessageType: &tmcons.Message{},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -223,34 +225,44 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
// Peer state updates can happen in parallel, but processing of
|
||||
// proposals, block parts, and votes are ordered by the receiveRoutine
|
||||
// NOTE: blocks on consensus state for proposals, block parts, and votes
|
||||
func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
func (conR *Reactor) Receive(e p2p.Envelope) {
|
||||
if !conR.IsRunning() {
|
||||
conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
|
||||
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := decodeMsg(msgBytes)
|
||||
if w, ok := e.Message.(p2p.Wrapper); ok {
|
||||
var err error
|
||||
e.Message, err = w.Wrap()
|
||||
if err != nil {
|
||||
conR.Logger.Error("Error wrapping message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
conR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
msg, err := MsgFromProto(e.Message.(*tmcons.Message))
|
||||
if err != nil {
|
||||
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
conR.Switch.StopPeerForError(src, err)
|
||||
conR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
conR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = msg.ValidateBasic(); err != nil {
|
||||
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
|
||||
conR.Switch.StopPeerForError(src, err)
|
||||
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
|
||||
conR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
|
||||
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
|
||||
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", msg)
|
||||
|
||||
// Get peer states
|
||||
ps, ok := src.Get(types.PeerStateKey).(*PeerState)
|
||||
ps, ok := e.Src.Get(types.PeerStateKey).(*PeerState)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("Peer %v has no state", src))
|
||||
panic(fmt.Sprintf("Peer %v has no state", e.Src))
|
||||
}
|
||||
|
||||
switch chID {
|
||||
switch e.ChannelID {
|
||||
case StateChannel:
|
||||
switch msg := msg.(type) {
|
||||
case *NewRoundStepMessage:
|
||||
@@ -258,8 +270,8 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
initialHeight := conR.conS.state.InitialHeight
|
||||
conR.conS.mtx.Unlock()
|
||||
if err = msg.ValidateHeight(initialHeight); err != nil {
|
||||
conR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
|
||||
conR.Switch.StopPeerForError(src, err)
|
||||
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
|
||||
conR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
ps.ApplyNewRoundStepMessage(msg)
|
||||
@@ -278,7 +290,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
// Peer claims to have a maj23 for some BlockID at H,R,S,
|
||||
err := votes.SetPeerMaj23(msg.Round, msg.Type, ps.peer.ID(), msg.BlockID)
|
||||
if err != nil {
|
||||
conR.Switch.StopPeerForError(src, err)
|
||||
conR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
// Respond with a VoteSetBitsMessage showing which votes we have.
|
||||
@@ -292,13 +304,19 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
default:
|
||||
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
|
||||
}
|
||||
src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{
|
||||
eMsg := &tmcons.VoteSetBits{
|
||||
Height: msg.Height,
|
||||
Round: msg.Round,
|
||||
Type: msg.Type,
|
||||
BlockID: msg.BlockID,
|
||||
Votes: ourVotes,
|
||||
}))
|
||||
BlockID: msg.BlockID.ToProto(),
|
||||
}
|
||||
if votes := ourVotes.ToProto(); votes != nil {
|
||||
eMsg.Votes = *votes
|
||||
}
|
||||
e.Src.TrySend(p2p.Envelope{
|
||||
ChannelID: VoteSetBitsChannel,
|
||||
Message: eMsg,
|
||||
})
|
||||
default:
|
||||
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
||||
}
|
||||
@@ -311,13 +329,13 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
switch msg := msg.(type) {
|
||||
case *ProposalMessage:
|
||||
ps.SetHasProposal(msg.Proposal)
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
|
||||
case *ProposalPOLMessage:
|
||||
ps.ApplyProposalPOLMessage(msg)
|
||||
case *BlockPartMessage:
|
||||
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
|
||||
conR.Metrics.BlockParts.With("peer_id", string(src.ID())).Add(1)
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
|
||||
default:
|
||||
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
||||
}
|
||||
@@ -337,7 +355,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
||||
ps.SetHasVote(msg.Vote)
|
||||
|
||||
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
|
||||
|
||||
default:
|
||||
// don't punish (leave room for soft upgrades)
|
||||
@@ -376,7 +394,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
}
|
||||
|
||||
default:
|
||||
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", chID))
|
||||
conR.Logger.Error(fmt.Sprintf("Unknown chId %X", e.ChannelID))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -430,29 +448,39 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
|
||||
|
||||
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
conR.Switch.Broadcast(StateChannel, MustEncode(nrsMsg))
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: nrsMsg,
|
||||
})
|
||||
}
|
||||
|
||||
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
|
||||
csMsg := &NewValidBlockMessage{
|
||||
psh := rs.ProposalBlockParts.Header()
|
||||
csMsg := &tmcons.NewValidBlock{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
|
||||
BlockParts: rs.ProposalBlockParts.BitArray(),
|
||||
BlockPartSetHeader: psh.ToProto(),
|
||||
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
|
||||
IsCommit: rs.Step == cstypes.RoundStepCommit,
|
||||
}
|
||||
conR.Switch.Broadcast(StateChannel, MustEncode(csMsg))
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: csMsg,
|
||||
})
|
||||
}
|
||||
|
||||
// Broadcasts HasVoteMessage to peers that care.
|
||||
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
msg := &HasVoteMessage{
|
||||
msg := &tmcons.HasVote{
|
||||
Height: vote.Height,
|
||||
Round: vote.Round,
|
||||
Type: vote.Type,
|
||||
Index: vote.ValidatorIndex,
|
||||
}
|
||||
conR.Switch.Broadcast(StateChannel, MustEncode(msg))
|
||||
conR.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: msg,
|
||||
})
|
||||
/*
|
||||
// TODO: Make this broadcast more selective.
|
||||
for _, peer := range conR.Switch.Peers().List() {
|
||||
@@ -463,7 +491,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
prs := ps.GetRoundState()
|
||||
if prs.Height == vote.Height {
|
||||
// TODO: Also filter on round?
|
||||
peer.TrySend(StateChannel, struct{ ConsensusMessage }{msg})
|
||||
e := p2p.Envelope{
|
||||
ChannelID: StateChannel, struct{ ConsensusMessage }{msg},
|
||||
Message: p,
|
||||
}
|
||||
peer.TrySend(e)
|
||||
} else {
|
||||
// Height doesn't match
|
||||
// TODO: check a field, maybe CatchupCommitRound?
|
||||
@@ -473,11 +505,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
|
||||
*/
|
||||
}
|
||||
|
||||
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
|
||||
nrsMsg = &NewRoundStepMessage{
|
||||
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) {
|
||||
nrsMsg = &tmcons.NewRoundStep{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Step: rs.Step,
|
||||
Step: uint32(rs.Step),
|
||||
SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()),
|
||||
LastCommitRound: rs.LastCommit.GetRound(),
|
||||
}
|
||||
@@ -487,7 +519,10 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage)
|
||||
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
|
||||
rs := conR.getRoundState()
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
peer.Send(StateChannel, MustEncode(nrsMsg))
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: nrsMsg,
|
||||
})
|
||||
}
|
||||
|
||||
func (conR *Reactor) updateRoundStateRoutine() {
|
||||
@@ -526,13 +561,19 @@ OUTER_LOOP:
|
||||
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
|
||||
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
|
||||
part := rs.ProposalBlockParts.GetPart(index)
|
||||
msg := &BlockPartMessage{
|
||||
Height: rs.Height, // This tells peer that this part applies to us.
|
||||
Round: rs.Round, // This tells peer that this part applies to us.
|
||||
Part: part,
|
||||
parts, err := part.ToProto()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
|
||||
if peer.Send(DataChannel, MustEncode(msg)) {
|
||||
if peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: rs.Height, // This tells peer that this part applies to us.
|
||||
Round: rs.Round, // This tells peer that this part applies to us.
|
||||
Part: *parts,
|
||||
},
|
||||
}) {
|
||||
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
|
||||
}
|
||||
continue OUTER_LOOP
|
||||
@@ -578,9 +619,11 @@ OUTER_LOOP:
|
||||
if rs.Proposal != nil && !prs.Proposal {
|
||||
// Proposal: share the proposal metadata with peer.
|
||||
{
|
||||
msg := &ProposalMessage{Proposal: rs.Proposal}
|
||||
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
|
||||
if peer.Send(DataChannel, MustEncode(msg)) {
|
||||
if peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
|
||||
}) {
|
||||
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
|
||||
ps.SetHasProposal(rs.Proposal)
|
||||
}
|
||||
@@ -590,13 +633,15 @@ OUTER_LOOP:
|
||||
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
|
||||
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
|
||||
if 0 <= rs.Proposal.POLRound {
|
||||
msg := &ProposalPOLMessage{
|
||||
Height: rs.Height,
|
||||
ProposalPOLRound: rs.Proposal.POLRound,
|
||||
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
|
||||
}
|
||||
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
|
||||
peer.Send(DataChannel, MustEncode(msg))
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.ProposalPOL{
|
||||
Height: rs.Height,
|
||||
ProposalPolRound: rs.Proposal.POLRound,
|
||||
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
|
||||
},
|
||||
})
|
||||
}
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
@@ -633,13 +678,20 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
|
||||
return
|
||||
}
|
||||
// Send the part
|
||||
msg := &BlockPartMessage{
|
||||
Height: prs.Height, // Not our height, so it doesn't matter.
|
||||
Round: prs.Round, // Not our height, so it doesn't matter.
|
||||
Part: part,
|
||||
}
|
||||
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
|
||||
if peer.Send(DataChannel, MustEncode(msg)) {
|
||||
pp, err := part.ToProto()
|
||||
if err != nil {
|
||||
logger.Error("Could not convert part to proto", "index", index, "error", err)
|
||||
return
|
||||
}
|
||||
if peer.Send(p2p.Envelope{
|
||||
ChannelID: DataChannel,
|
||||
Message: &tmcons.BlockPart{
|
||||
Height: prs.Height, // Not our height, so it doesn't matter.
|
||||
Round: prs.Round, // Not our height, so it doesn't matter.
|
||||
Part: *pp,
|
||||
},
|
||||
}) {
|
||||
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
|
||||
} else {
|
||||
logger.Debug("Sending block part for catchup failed")
|
||||
@@ -798,12 +850,16 @@ OUTER_LOOP:
|
||||
prs := ps.GetRoundState()
|
||||
if rs.Height == prs.Height {
|
||||
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
|
||||
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.Round,
|
||||
Type: tmproto.PrevoteType,
|
||||
BlockID: maj23,
|
||||
}))
|
||||
|
||||
peer.TrySend(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Height: prs.Height,
|
||||
Round: prs.Round,
|
||||
Type: tmproto.PrevoteType,
|
||||
BlockID: maj23.ToProto(),
|
||||
},
|
||||
})
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
}
|
||||
}
|
||||
@@ -815,12 +871,15 @@ OUTER_LOOP:
|
||||
prs := ps.GetRoundState()
|
||||
if rs.Height == prs.Height {
|
||||
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
|
||||
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.Round,
|
||||
Type: tmproto.PrecommitType,
|
||||
BlockID: maj23,
|
||||
}))
|
||||
peer.TrySend(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Height: prs.Height,
|
||||
Round: prs.Round,
|
||||
Type: tmproto.PrecommitType,
|
||||
BlockID: maj23.ToProto(),
|
||||
},
|
||||
})
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
}
|
||||
}
|
||||
@@ -832,12 +891,16 @@ OUTER_LOOP:
|
||||
prs := ps.GetRoundState()
|
||||
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
|
||||
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
|
||||
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: prs.ProposalPOLRound,
|
||||
Type: tmproto.PrevoteType,
|
||||
BlockID: maj23,
|
||||
}))
|
||||
|
||||
peer.TrySend(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Height: prs.Height,
|
||||
Round: prs.ProposalPOLRound,
|
||||
Type: tmproto.PrevoteType,
|
||||
BlockID: maj23.ToProto(),
|
||||
},
|
||||
})
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
}
|
||||
}
|
||||
@@ -852,12 +915,15 @@ OUTER_LOOP:
|
||||
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
|
||||
prs.Height >= conR.conS.blockStore.Base() {
|
||||
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
|
||||
peer.TrySend(StateChannel, MustEncode(&VoteSetMaj23Message{
|
||||
Height: prs.Height,
|
||||
Round: commit.Round,
|
||||
Type: tmproto.PrecommitType,
|
||||
BlockID: commit.BlockID,
|
||||
}))
|
||||
peer.TrySend(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Message: &tmcons.VoteSetMaj23{
|
||||
Height: prs.Height,
|
||||
Round: commit.Round,
|
||||
Type: tmproto.PrecommitType,
|
||||
BlockID: commit.BlockID.ToProto(),
|
||||
},
|
||||
})
|
||||
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
|
||||
}
|
||||
}
|
||||
@@ -1071,9 +1137,13 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
|
||||
// Returns true if vote was sent.
|
||||
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
|
||||
if vote, ok := ps.PickVoteToSend(votes); ok {
|
||||
msg := &VoteMessage{vote}
|
||||
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
|
||||
if ps.peer.Send(VoteChannel, MustEncode(msg)) {
|
||||
if ps.peer.Send(p2p.Envelope{
|
||||
ChannelID: VoteChannel,
|
||||
Message: &tmcons.Vote{
|
||||
Vote: vote.ToProto(),
|
||||
},
|
||||
}) {
|
||||
ps.SetHasVote(vote)
|
||||
return true
|
||||
}
|
||||
@@ -1439,15 +1509,6 @@ func init() {
|
||||
tmjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")
|
||||
}
|
||||
|
||||
func decodeMsg(bz []byte) (msg Message, err error) {
|
||||
pb := &tmcons.Message{}
|
||||
if err = proto.Unmarshal(bz, pb); err != nil {
|
||||
return msg, err
|
||||
}
|
||||
|
||||
return MsgFromProto(pb)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
p2pmock "github.com/tendermint/tendermint/p2p/mock"
|
||||
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
statemocks "github.com/tendermint/tendermint/state/mocks"
|
||||
@@ -265,15 +266,18 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
|
||||
var (
|
||||
reactor = reactors[0]
|
||||
peer = p2pmock.NewPeer(nil)
|
||||
msg = MustEncode(&HasVoteMessage{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType})
|
||||
)
|
||||
|
||||
reactor.InitPeer(peer)
|
||||
|
||||
// simulate switch calling Receive before AddPeer
|
||||
assert.NotPanics(t, func() {
|
||||
reactor.Receive(StateChannel, peer, msg)
|
||||
reactor.Receive(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Src: peer,
|
||||
Message: &tmcons.HasVote{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType},
|
||||
})
|
||||
reactor.AddPeer(peer)
|
||||
})
|
||||
}
|
||||
@@ -288,15 +292,18 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
|
||||
var (
|
||||
reactor = reactors[0]
|
||||
peer = p2pmock.NewPeer(nil)
|
||||
msg = MustEncode(&HasVoteMessage{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType})
|
||||
)
|
||||
|
||||
// we should call InitPeer here
|
||||
|
||||
// simulate switch calling Receive before AddPeer
|
||||
assert.Panics(t, func() {
|
||||
reactor.Receive(StateChannel, peer, msg)
|
||||
reactor.Receive(p2p.Envelope{
|
||||
ChannelID: StateChannel,
|
||||
Src: peer,
|
||||
Message: &tmcons.HasVote{Height: 1,
|
||||
Round: 1, Index: 1, Type: tmproto.PrevoteType},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
clist "github.com/tendermint/tendermint/libs/clist"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
@@ -55,6 +56,7 @@ func (evR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
ID: EvidenceChannel,
|
||||
Priority: 6,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
MessageType: &tmproto.EvidenceList{},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -66,11 +68,11 @@ func (evR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
|
||||
// Receive implements Reactor.
|
||||
// It adds any received evidence to the evpool.
|
||||
func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
evis, err := decodeMsg(msgBytes)
|
||||
func (evR *Reactor) Receive(e p2p.Envelope) {
|
||||
evis, err := evidenceListFromProto(e.Message)
|
||||
if err != nil {
|
||||
evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
evR.Switch.StopPeerForError(src, err)
|
||||
evR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
|
||||
evR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -80,7 +82,7 @@ func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
case *types.ErrInvalidEvidence:
|
||||
evR.Logger.Error(err.Error())
|
||||
// punish peer
|
||||
evR.Switch.StopPeerForError(src, err)
|
||||
evR.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
case nil:
|
||||
default:
|
||||
@@ -126,11 +128,15 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
|
||||
evis := evR.prepareEvidenceMessage(peer, ev)
|
||||
if len(evis) > 0 {
|
||||
evR.Logger.Debug("Gossiping evidence to peer", "ev", ev, "peer", peer)
|
||||
msgBytes, err := encodeMsg(evis)
|
||||
evp, err := evidenceListToProto(evis)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
success := peer.Send(EvidenceChannel, msgBytes)
|
||||
|
||||
success := peer.Send(p2p.Envelope{
|
||||
ChannelID: EvidenceChannel,
|
||||
Message: evp,
|
||||
})
|
||||
if !success {
|
||||
time.Sleep(peerRetryMessageIntervalMS * time.Millisecond)
|
||||
continue
|
||||
@@ -210,7 +216,7 @@ type PeerState interface {
|
||||
|
||||
// encodemsg takes a array of evidence
|
||||
// returns the byte encoding of the List Message
|
||||
func encodeMsg(evis []types.Evidence) ([]byte, error) {
|
||||
func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
|
||||
evi := make([]tmproto.Evidence, len(evis))
|
||||
for i := 0; i < len(evis); i++ {
|
||||
ev, err := types.EvidenceToProto(evis[i])
|
||||
@@ -222,19 +228,13 @@ func encodeMsg(evis []types.Evidence) ([]byte, error) {
|
||||
epl := tmproto.EvidenceList{
|
||||
Evidence: evi,
|
||||
}
|
||||
|
||||
return epl.Marshal()
|
||||
return &epl, nil
|
||||
}
|
||||
|
||||
// decodemsg takes an array of bytes
|
||||
// returns an array of evidence
|
||||
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {
|
||||
lm := tmproto.EvidenceList{}
|
||||
if err := lm.Unmarshal(bz); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
func evidenceListFromProto(m proto.Message) ([]types.Evidence, error) {
|
||||
lm := m.(*tmproto.EvidenceList)
|
||||
|
||||
evis = make([]types.Evidence, len(lm.Evidence))
|
||||
evis := make([]types.Evidence, len(lm.Evidence))
|
||||
for i := 0; i < len(lm.Evidence); i++ {
|
||||
ev, err := types.EvidenceFromProto(&lm.Evidence[i])
|
||||
if err != nil {
|
||||
|
||||
@@ -208,7 +208,10 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
|
||||
// i.e. broadcastEvidenceRoutine finishes when peer is stopped
|
||||
defer leaktest.CheckTimeout(t, 10*time.Second)()
|
||||
|
||||
p.On("Send", evidence.EvidenceChannel, mock.AnythingOfType("[]uint8")).Return(false)
|
||||
p.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == evidence.EvidenceChannel
|
||||
})).Return(false)
|
||||
quitChan := make(<-chan struct{})
|
||||
p.On("Quit").Return(quitChan)
|
||||
ps := peerState{2}
|
||||
|
||||
2
go.mod
2
go.mod
@@ -34,7 +34,7 @@ require (
|
||||
github.com/tendermint/tm-db v0.6.6
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa
|
||||
golang.org/x/net v0.0.0-20220812174116-3211cb980234
|
||||
google.golang.org/grpc v1.49.0
|
||||
google.golang.org/grpc v1.50.0
|
||||
)
|
||||
|
||||
require (
|
||||
|
||||
4
go.sum
4
go.sum
@@ -1696,8 +1696,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
|
||||
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
|
||||
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
|
||||
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
||||
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
|
||||
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
|
||||
google.golang.org/grpc v1.50.0 h1:fPVVDxY9w++VjTZsYvXWqEf9Rqar/e+9zYfxKK+W+YU=
|
||||
google.golang.org/grpc v1.50.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
|
||||
@@ -134,6 +134,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
ID: mempool.MempoolChannel,
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
MessageType: &protomem.Message{},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -154,27 +155,33 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
msg, err := memR.decodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
memR.Switch.StopPeerForError(src, err)
|
||||
return
|
||||
}
|
||||
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
|
||||
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)}
|
||||
if src != nil {
|
||||
txInfo.SenderP2PID = src.ID()
|
||||
}
|
||||
|
||||
for _, tx := range msg.Txs {
|
||||
err = memR.mempool.CheckTx(tx, nil, txInfo)
|
||||
if errors.Is(err, mempool.ErrTxInCache) {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
|
||||
func (memR *Reactor) Receive(e p2p.Envelope) {
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
switch msg := e.Message.(type) {
|
||||
case *protomem.Txs:
|
||||
protoTxs := msg.GetTxs()
|
||||
if len(protoTxs) == 0 {
|
||||
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
|
||||
return
|
||||
}
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
|
||||
if e.Src != nil {
|
||||
txInfo.SenderP2PID = e.Src.ID()
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, tx := range protoTxs {
|
||||
ntx := types.Tx(tx)
|
||||
err = memR.mempool.CheckTx(ntx, nil, txInfo)
|
||||
if errors.Is(err, mempool.ErrTxInCache) {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
return
|
||||
}
|
||||
|
||||
// broadcasting happens from go routines per peer
|
||||
@@ -234,18 +241,14 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
// https://github.com/tendermint/tendermint/issues/5796
|
||||
|
||||
if _, ok := memTx.senders.Load(peerID); !ok {
|
||||
msg := protomem.Message{
|
||||
Sum: &protomem.Message_Txs{
|
||||
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
|
||||
success := peer.Send(p2p.Envelope{
|
||||
ChannelID: mempool.MempoolChannel,
|
||||
Message: &protomem.Message{
|
||||
Sum: &protomem.Message_Txs{
|
||||
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bz, err := msg.Marshal()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
success := peer.Send(mempool.MempoolChannel, bz)
|
||||
})
|
||||
if !success {
|
||||
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
|
||||
continue
|
||||
@@ -264,35 +267,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
}
|
||||
}
|
||||
|
||||
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
|
||||
msg := protomem.Message{}
|
||||
err := msg.Unmarshal(bz)
|
||||
if err != nil {
|
||||
return TxsMessage{}, err
|
||||
}
|
||||
|
||||
var message TxsMessage
|
||||
|
||||
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
|
||||
txs := i.Txs.GetTxs()
|
||||
|
||||
if len(txs) == 0 {
|
||||
return message, errors.New("empty TxsMessage")
|
||||
}
|
||||
|
||||
decoded := make([]types.Tx, len(txs))
|
||||
for j, tx := range txs {
|
||||
decoded[j] = types.Tx(tx)
|
||||
}
|
||||
|
||||
message = TxsMessage{
|
||||
Txs: decoded,
|
||||
}
|
||||
return message, nil
|
||||
}
|
||||
return message, fmt.Errorf("msg type: %T is not supported", msg)
|
||||
}
|
||||
|
||||
// TxsMessage is a Message containing transactions.
|
||||
type TxsMessage struct {
|
||||
Txs []types.Tx
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/p2p/mock"
|
||||
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
||||
protomem "github.com/tendermint/tendermint/proto/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -279,7 +280,12 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
||||
|
||||
for i := 0; i < mempool.MaxActiveIDs+1; i++ {
|
||||
peer := mock.NewPeer(nil)
|
||||
reactor.Receive(mempool.MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
|
||||
reactor.Receive(p2p.Envelope{
|
||||
ChannelID: mempool.MempoolChannel,
|
||||
Src: peer,
|
||||
Message: &protomem.Txs{
|
||||
Txs: [][]byte{{0x01, 0x02, 0x03}},
|
||||
}})
|
||||
reactor.AddPeer(peer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +133,7 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
ID: mempool.MempoolChannel,
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
MessageType: &protomem.Message{},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -153,27 +154,35 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
msg, err := memR.decodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
memR.Switch.StopPeerForError(src, err)
|
||||
func (memR *Reactor) Receive(e p2p.Envelope) {
|
||||
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
switch msg := e.Message.(type) {
|
||||
case *protomem.Txs:
|
||||
protoTxs := msg.GetTxs()
|
||||
if len(protoTxs) == 0 {
|
||||
memR.Logger.Error("received tmpty txs from peer", "src", e.Src)
|
||||
return
|
||||
}
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)}
|
||||
if e.Src != nil {
|
||||
txInfo.SenderP2PID = e.Src.ID()
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, tx := range protoTxs {
|
||||
ntx := types.Tx(tx)
|
||||
err = memR.mempool.CheckTx(ntx, nil, txInfo)
|
||||
if errors.Is(err, mempool.ErrTxInCache) {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
return
|
||||
}
|
||||
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
|
||||
|
||||
txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(src)}
|
||||
if src != nil {
|
||||
txInfo.SenderP2PID = src.ID()
|
||||
}
|
||||
for _, tx := range msg.Txs {
|
||||
err = memR.mempool.CheckTx(tx, nil, txInfo)
|
||||
if err == mempool.ErrTxInCache {
|
||||
memR.Logger.Debug("Tx already exists in cache", "tx", tx.String())
|
||||
} else if err != nil {
|
||||
memR.Logger.Info("Could not check tx", "tx", tx.String(), "err", err)
|
||||
}
|
||||
}
|
||||
// broadcasting happens from go routines per peer
|
||||
}
|
||||
|
||||
@@ -233,18 +242,14 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
// NOTE: Transaction batching was disabled due to
|
||||
// https://github.com/tendermint/tendermint/issues/5796
|
||||
if !memTx.HasPeer(peerID) {
|
||||
msg := protomem.Message{
|
||||
Sum: &protomem.Message_Txs{
|
||||
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
|
||||
success := peer.Send(p2p.Envelope{
|
||||
ChannelID: mempool.MempoolChannel,
|
||||
Message: &protomem.Message{
|
||||
Sum: &protomem.Message_Txs{
|
||||
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bz, err := msg.Marshal()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
success := peer.Send(mempool.MempoolChannel, bz)
|
||||
})
|
||||
if !success {
|
||||
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
|
||||
continue
|
||||
@@ -268,37 +273,6 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
func (memR *Reactor) decodeMsg(bz []byte) (TxsMessage, error) {
|
||||
msg := protomem.Message{}
|
||||
err := msg.Unmarshal(bz)
|
||||
if err != nil {
|
||||
return TxsMessage{}, err
|
||||
}
|
||||
|
||||
var message TxsMessage
|
||||
|
||||
if i, ok := msg.Sum.(*protomem.Message_Txs); ok {
|
||||
txs := i.Txs.GetTxs()
|
||||
|
||||
if len(txs) == 0 {
|
||||
return message, errors.New("empty TxsMessage")
|
||||
}
|
||||
|
||||
decoded := make([]types.Tx, len(txs))
|
||||
for j, tx := range txs {
|
||||
decoded[j] = types.Tx(tx)
|
||||
}
|
||||
|
||||
message = TxsMessage{
|
||||
Txs: decoded,
|
||||
}
|
||||
return message, nil
|
||||
}
|
||||
return message, fmt.Errorf("msg type: %T is not supported", msg)
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// TxsMessage is a Message containing transactions.
|
||||
type TxsMessage struct {
|
||||
Txs []types.Tx
|
||||
|
||||
@@ -31,7 +31,7 @@ To replace the built-in p2p.Reactor, use the CustomReactors option:
|
||||
dbProvider,
|
||||
metricsProvider,
|
||||
logger,
|
||||
CustomReactors(map[string]p2p.Reactor{"BLOCKCHAIN": customBlockchainReactor}),
|
||||
CustomReactors(map[string]p2p.Reactor{"BLOCKSYNC": customBlocksyncReactor}),
|
||||
)
|
||||
|
||||
The list of existing reactors can be found in CustomReactors documentation.
|
||||
|
||||
20
node/node.go
20
node/node.go
@@ -146,7 +146,7 @@ type blockSyncReactor interface {
|
||||
// result in replacing it with the custom one.
|
||||
//
|
||||
// - MEMPOOL
|
||||
// - BLOCKCHAIN
|
||||
// - BLOCKSYNC
|
||||
// - CONSENSUS
|
||||
// - EVIDENCE
|
||||
// - PEX
|
||||
@@ -303,7 +303,7 @@ func createAndStartIndexerService(
|
||||
blockIndexer = &blockidxnull.BlockerIndexer{}
|
||||
}
|
||||
|
||||
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
|
||||
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
|
||||
indexerService.SetLogger(logger.With("module", "txindex"))
|
||||
|
||||
if err := indexerService.Start(); err != nil {
|
||||
@@ -441,7 +441,7 @@ func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
|
||||
return evidenceReactor, evidencePool, nil
|
||||
}
|
||||
|
||||
func createBlockchainReactor(config *cfg.Config,
|
||||
func createBlocksyncReactor(config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore *store.BlockStore,
|
||||
@@ -457,7 +457,7 @@ func createBlockchainReactor(config *cfg.Config,
|
||||
return nil, fmt.Errorf("unknown fastsync version %s", config.BlockSync.Version)
|
||||
}
|
||||
|
||||
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
||||
bcReactor.SetLogger(logger.With("module", "blocksync"))
|
||||
return bcReactor, nil
|
||||
}
|
||||
|
||||
@@ -584,7 +584,7 @@ func createSwitch(config *cfg.Config,
|
||||
)
|
||||
sw.SetLogger(p2pLogger)
|
||||
sw.AddReactor("MEMPOOL", mempoolReactor)
|
||||
sw.AddReactor("BLOCKCHAIN", bcReactor)
|
||||
sw.AddReactor("BLOCKSYNC", bcReactor)
|
||||
sw.AddReactor("CONSENSUS", consensusReactor)
|
||||
sw.AddReactor("EVIDENCE", evidenceReactor)
|
||||
sw.AddReactor("STATESYNC", stateSyncReactor)
|
||||
@@ -803,7 +803,7 @@ func NewNode(config *cfg.Config,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// make block executor for consensus and blockchain reactors to execute blocks
|
||||
// make block executor for consensus and blocksync reactors to execute blocks
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger.With("module", "state"),
|
||||
@@ -814,10 +814,10 @@ func NewNode(config *cfg.Config,
|
||||
sm.BlockExecutorWithMetrics(smMetrics),
|
||||
)
|
||||
|
||||
// Make BlockchainReactor. Don't start block sync if we're doing a state sync first.
|
||||
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
|
||||
// Make BlocksyncReactor. Don't start block sync if we're doing a state sync first.
|
||||
bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, blockSync && !stateSync, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
|
||||
return nil, fmt.Errorf("could not create blocksync reactor: %w", err)
|
||||
}
|
||||
|
||||
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
|
||||
@@ -990,7 +990,7 @@ func (n *Node) OnStart() error {
|
||||
if n.stateSync {
|
||||
bcR, ok := n.bcReactor.(blockSyncReactor)
|
||||
if !ok {
|
||||
return fmt.Errorf("this blockchain reactor does not support switching from state sync")
|
||||
return fmt.Errorf("this blocksync reactor does not support switching from state sync")
|
||||
}
|
||||
err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
|
||||
n.config.StateSync, n.config.BlockSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis)
|
||||
|
||||
@@ -427,7 +427,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
||||
RecvMessageCapacity: 100,
|
||||
},
|
||||
}
|
||||
customBlockchainReactor := p2pmock.NewReactor()
|
||||
customBlocksyncReactor := p2pmock.NewReactor()
|
||||
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
|
||||
require.NoError(t, err)
|
||||
@@ -440,7 +440,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
||||
DefaultDBProvider,
|
||||
DefaultMetricsProvider(config.Instrumentation),
|
||||
log.TestingLogger(),
|
||||
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKCHAIN": customBlockchainReactor}),
|
||||
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -451,8 +451,8 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
||||
assert.True(t, cr.IsRunning())
|
||||
assert.Equal(t, cr, n.Switch().Reactor("FOO"))
|
||||
|
||||
assert.True(t, customBlockchainReactor.IsRunning())
|
||||
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
|
||||
assert.True(t, customBlocksyncReactor.IsRunning())
|
||||
assert.Equal(t, customBlocksyncReactor, n.Switch().Reactor("BLOCKSYNC"))
|
||||
|
||||
channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
|
||||
assert.Contains(t, channels, mempl.MempoolChannel)
|
||||
|
||||
@@ -38,13 +38,13 @@ type Reactor interface {
|
||||
// or other reason).
|
||||
RemovePeer(peer Peer, reason interface{})
|
||||
|
||||
// Receive is called by the switch when msgBytes is received from the peer.
|
||||
// Receive is called by the switch when a message is received from the peer.
|
||||
//
|
||||
// NOTE reactor can not keep msgBytes around after Receive completes without
|
||||
// copying.
|
||||
//
|
||||
// CONTRACT: msgBytes are not nil.
|
||||
Receive(chID byte, peer Peer, msgBytes []byte)
|
||||
Receive(Envelope)
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
@@ -64,8 +64,8 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor {
|
||||
func (br *BaseReactor) SetSwitch(sw *Switch) {
|
||||
br.Switch = sw
|
||||
}
|
||||
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
|
||||
func (*BaseReactor) AddPeer(peer Peer) {}
|
||||
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
|
||||
func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}
|
||||
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
|
||||
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
|
||||
func (*BaseReactor) AddPeer(peer Peer) {}
|
||||
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
|
||||
func (*BaseReactor) Receive(e Envelope) {}
|
||||
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
|
||||
|
||||
@@ -724,6 +724,7 @@ type ChannelDescriptor struct {
|
||||
SendQueueCapacity int
|
||||
RecvBufferCapacity int
|
||||
RecvMessageCapacity int
|
||||
MessageType proto.Message
|
||||
}
|
||||
|
||||
func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
|
||||
|
||||
@@ -145,6 +145,13 @@ func (e ErrTransportClosed) Error() string {
|
||||
return "transport has been closed"
|
||||
}
|
||||
|
||||
// ErrPeerRemoval is raised when attempting to remove a peer results in an error.
|
||||
type ErrPeerRemoval struct{}
|
||||
|
||||
func (e ErrPeerRemoval) Error() string {
|
||||
return "peer removal failed"
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------
|
||||
|
||||
type ErrNetAddressNoID struct {
|
||||
|
||||
@@ -44,15 +44,29 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
Name: "num_txs",
|
||||
Help: "Number of transactions submitted by each peer.",
|
||||
}, append(labels, "peer_id")).With(labelsAndValues...),
|
||||
MessageReceiveBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "message_receive_bytes_total",
|
||||
Help: "Number of bytes of each message type received.",
|
||||
}, append(labels, "message_type")).With(labelsAndValues...),
|
||||
MessageSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "message_send_bytes_total",
|
||||
Help: "Number of bytes of each message type sent.",
|
||||
}, append(labels, "message_type")).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
|
||||
func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
Peers: discard.NewGauge(),
|
||||
PeerReceiveBytesTotal: discard.NewCounter(),
|
||||
PeerSendBytesTotal: discard.NewCounter(),
|
||||
PeerPendingSendBytes: discard.NewGauge(),
|
||||
NumTxs: discard.NewGauge(),
|
||||
Peers: discard.NewGauge(),
|
||||
PeerReceiveBytesTotal: discard.NewCounter(),
|
||||
PeerSendBytesTotal: discard.NewCounter(),
|
||||
PeerPendingSendBytes: discard.NewGauge(),
|
||||
NumTxs: discard.NewGauge(),
|
||||
MessageReceiveBytesTotal: discard.NewCounter(),
|
||||
MessageSendBytesTotal: discard.NewCounter(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,4 +24,8 @@ type Metrics struct {
|
||||
PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"`
|
||||
// Number of transactions submitted by each peer.
|
||||
NumTxs metrics.Gauge `metrics_labels:"peer_id"`
|
||||
// Number of bytes of each message type received.
|
||||
MessageReceiveBytesTotal metrics.Counter `metrics_labels:"message_type"`
|
||||
// Number of bytes of each message type sent.
|
||||
MessageSendBytesTotal metrics.Counter `metrics_labels:"message_type"`
|
||||
}
|
||||
|
||||
@@ -42,9 +42,9 @@ func NewPeer(ip net.IP) *Peer {
|
||||
return mp
|
||||
}
|
||||
|
||||
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
|
||||
func (mp *Peer) TrySend(chID byte, msgBytes []byte) bool { return true }
|
||||
func (mp *Peer) Send(chID byte, msgBytes []byte) bool { return true }
|
||||
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
|
||||
func (mp *Peer) TrySend(e p2p.Envelope) bool { return true }
|
||||
func (mp *Peer) Send(e p2p.Envelope) bool { return true }
|
||||
func (mp *Peer) NodeInfo() p2p.NodeInfo {
|
||||
return p2p.DefaultNodeInfo{
|
||||
DefaultNodeID: mp.addr.ID,
|
||||
@@ -68,3 +68,5 @@ func (mp *Peer) RemoteIP() net.IP { return mp.ip }
|
||||
func (mp *Peer) SocketAddr() *p2p.NetAddress { return mp.addr }
|
||||
func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
|
||||
func (mp *Peer) CloseConn() error { return nil }
|
||||
func (mp *Peer) SetRemovalFailed() {}
|
||||
func (mp *Peer) GetRemovalFailed() bool { return false }
|
||||
|
||||
@@ -19,7 +19,7 @@ func NewReactor() *Reactor {
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
|
||||
func (r *Reactor) AddPeer(peer p2p.Peer) {}
|
||||
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
|
||||
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
|
||||
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
|
||||
func (r *Reactor) AddPeer(peer p2p.Peer) {}
|
||||
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
|
||||
func (r *Reactor) Receive(e p2p.Envelope) {}
|
||||
|
||||
@@ -53,6 +53,20 @@ func (_m *Peer) Get(_a0 string) interface{} {
|
||||
return r0
|
||||
}
|
||||
|
||||
// GetRemovalFailed provides a mock function with given fields:
|
||||
func (_m *Peer) GetRemovalFailed() bool {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func() bool); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// ID provides a mock function with given fields:
|
||||
func (_m *Peer) ID() p2p.ID {
|
||||
ret := _m.Called()
|
||||
@@ -220,13 +234,13 @@ func (_m *Peer) Reset() error {
|
||||
return r0
|
||||
}
|
||||
|
||||
// Send provides a mock function with given fields: _a0, _a1
|
||||
func (_m *Peer) Send(_a0 byte, _a1 []byte) bool {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
// Send provides a mock function with given fields: _a0
|
||||
func (_m *Peer) Send(_a0 p2p.Envelope) bool {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(byte, []byte) bool); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
@@ -244,6 +258,11 @@ func (_m *Peer) SetLogger(_a0 log.Logger) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
// SetRemovalFailed provides a mock function with given fields:
|
||||
func (_m *Peer) SetRemovalFailed() {
|
||||
_m.Called()
|
||||
}
|
||||
|
||||
// SocketAddr provides a mock function with given fields:
|
||||
func (_m *Peer) SocketAddr() *p2p.NetAddress {
|
||||
ret := _m.Called()
|
||||
@@ -316,13 +335,13 @@ func (_m *Peer) String() string {
|
||||
return r0
|
||||
}
|
||||
|
||||
// TrySend provides a mock function with given fields: _a0, _a1
|
||||
func (_m *Peer) TrySend(_a0 byte, _a1 []byte) bool {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
// TrySend provides a mock function with given fields: _a0
|
||||
func (_m *Peer) TrySend(_a0 p2p.Envelope) bool {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(byte, []byte) bool); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
84
p2p/peer.go
84
p2p/peer.go
@@ -5,6 +5,8 @@ import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/cmap"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
@@ -34,11 +36,14 @@ type Peer interface {
|
||||
Status() tmconn.ConnectionStatus
|
||||
SocketAddr() *NetAddress // actual address of the socket
|
||||
|
||||
Send(byte, []byte) bool
|
||||
TrySend(byte, []byte) bool
|
||||
Send(Envelope) bool
|
||||
TrySend(Envelope) bool
|
||||
|
||||
Set(string, interface{})
|
||||
Get(string) interface{}
|
||||
|
||||
SetRemovalFailed()
|
||||
GetRemovalFailed() bool
|
||||
}
|
||||
|
||||
//----------------------------------------------------------
|
||||
@@ -117,6 +122,9 @@ type peer struct {
|
||||
|
||||
metrics *Metrics
|
||||
metricsTicker *time.Ticker
|
||||
|
||||
// When removal of a peer fails, we set this flag
|
||||
removalAttemptFailed bool
|
||||
}
|
||||
|
||||
type PeerOption func(*peer)
|
||||
@@ -126,6 +134,7 @@ func newPeer(
|
||||
mConfig tmconn.MConnConfig,
|
||||
nodeInfo NodeInfo,
|
||||
reactorsByCh map[byte]Reactor,
|
||||
msgTypeByChID map[byte]proto.Message,
|
||||
chDescs []*tmconn.ChannelDescriptor,
|
||||
onPeerError func(Peer, interface{}),
|
||||
options ...PeerOption,
|
||||
@@ -143,6 +152,7 @@ func newPeer(
|
||||
pc.conn,
|
||||
p,
|
||||
reactorsByCh,
|
||||
msgTypeByChID,
|
||||
chDescs,
|
||||
onPeerError,
|
||||
mConfig,
|
||||
@@ -243,19 +253,31 @@ func (p *peer) Status() tmconn.ConnectionStatus {
|
||||
|
||||
// Send msg bytes to the channel identified by chID byte. Returns false if the
|
||||
// send queue is full after timeout, specified by MConnection.
|
||||
func (p *peer) Send(chID byte, msgBytes []byte) bool {
|
||||
func (p *peer) Send(e Envelope) bool {
|
||||
if !p.IsRunning() {
|
||||
// see Switch#Broadcast, where we fetch the list of peers and loop over
|
||||
// them - while we're looping, one peer may be removed and stopped.
|
||||
return false
|
||||
} else if !p.hasChannel(chID) {
|
||||
} else if !p.hasChannel(e.ChannelID) {
|
||||
return false
|
||||
}
|
||||
res := p.mconn.Send(chID, msgBytes)
|
||||
msg := e.Message
|
||||
if w, ok := msg.(Wrapper); ok {
|
||||
var err error
|
||||
msg, err = w.Wrap()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
msgBytes, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
panic(err) // Q: should this panic or error?
|
||||
}
|
||||
res := p.mconn.Send(e.ChannelID, msgBytes)
|
||||
if res {
|
||||
labels := []string{
|
||||
"peer_id", string(p.ID()),
|
||||
"chID", fmt.Sprintf("%#x", chID),
|
||||
"chID", fmt.Sprintf("%#x", e.ChannelID),
|
||||
}
|
||||
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
|
||||
}
|
||||
@@ -264,17 +286,29 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool {
|
||||
|
||||
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
|
||||
// false if the send queue is full.
|
||||
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
|
||||
func (p *peer) TrySend(e Envelope) bool {
|
||||
if !p.IsRunning() {
|
||||
return false
|
||||
} else if !p.hasChannel(chID) {
|
||||
} else if !p.hasChannel(e.ChannelID) {
|
||||
return false
|
||||
}
|
||||
res := p.mconn.TrySend(chID, msgBytes)
|
||||
msg := e.Message
|
||||
if w, ok := msg.(Wrapper); ok {
|
||||
var err error
|
||||
msg, err = w.Wrap()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
msgBytes, err := proto.Marshal(msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
res := p.mconn.TrySend(e.ChannelID, msgBytes)
|
||||
if res {
|
||||
labels := []string{
|
||||
"peer_id", string(p.ID()),
|
||||
"chID", fmt.Sprintf("%#x", chID),
|
||||
"chID", fmt.Sprintf("%#x", e.ChannelID),
|
||||
}
|
||||
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
|
||||
}
|
||||
@@ -316,6 +350,14 @@ func (p *peer) CloseConn() error {
|
||||
return p.peerConn.conn.Close()
|
||||
}
|
||||
|
||||
func (p *peer) SetRemovalFailed() {
|
||||
p.removalAttemptFailed = true
|
||||
}
|
||||
|
||||
func (p *peer) GetRemovalFailed() bool {
|
||||
return p.removalAttemptFailed
|
||||
}
|
||||
|
||||
//---------------------------------------------------
|
||||
// methods only used for testing
|
||||
// TODO: can we remove these?
|
||||
@@ -370,6 +412,7 @@ func createMConnection(
|
||||
conn net.Conn,
|
||||
p *peer,
|
||||
reactorsByCh map[byte]Reactor,
|
||||
msgTypeByChID map[byte]proto.Message,
|
||||
chDescs []*tmconn.ChannelDescriptor,
|
||||
onPeerError func(Peer, interface{}),
|
||||
config tmconn.MConnConfig,
|
||||
@@ -382,12 +425,31 @@ func createMConnection(
|
||||
// which does onPeerError.
|
||||
panic(fmt.Sprintf("Unknown channel %X", chID))
|
||||
}
|
||||
mt := msgTypeByChID[chID]
|
||||
msg := proto.Clone(mt)
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
// TODO(williambanfield) add a log line
|
||||
return
|
||||
}
|
||||
labels := []string{
|
||||
"peer_id", string(p.ID()),
|
||||
"chID", fmt.Sprintf("%#x", chID),
|
||||
}
|
||||
if w, ok := msg.(Unwrapper); ok {
|
||||
msg, err = w.Unwrap()
|
||||
if err != nil {
|
||||
// TODO(williambanfield) add error log line.
|
||||
return
|
||||
}
|
||||
}
|
||||
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
|
||||
reactor.Receive(chID, p, msgBytes)
|
||||
p.metrics.MessageReceiveBytesTotal.With("message_type", "tmp").Add(float64(len(msgBytes)))
|
||||
reactor.Receive(Envelope{
|
||||
ChannelID: chID,
|
||||
Src: p,
|
||||
Message: msg,
|
||||
})
|
||||
}
|
||||
|
||||
onError := func(r interface{}) {
|
||||
|
||||
@@ -47,6 +47,9 @@ func (ps *PeerSet) Add(peer Peer) error {
|
||||
if ps.lookup[peer.ID()] != nil {
|
||||
return ErrSwitchDuplicatePeerID{peer.ID()}
|
||||
}
|
||||
if peer.GetRemovalFailed() {
|
||||
return ErrPeerRemoval{}
|
||||
}
|
||||
|
||||
index := len(ps.list)
|
||||
// Appending is safe even with other goroutines
|
||||
@@ -107,6 +110,12 @@ func (ps *PeerSet) Remove(peer Peer) bool {
|
||||
|
||||
item := ps.lookup[peer.ID()]
|
||||
if item == nil {
|
||||
// Removing the peer has failed so we set a flag to mark that a removal was attempted.
|
||||
// This can happen when the peer add routine from the switch is running in
|
||||
// parallel to the receive routine of MConn.
|
||||
// There is an error within MConn but the switch has not actually added the peer to the peer set yet.
|
||||
// Setting this flag will prevent a peer from being added to a node's peer set afterwards.
|
||||
peer.SetRemovalFailed()
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -18,20 +18,22 @@ type mockPeer struct {
|
||||
id ID
|
||||
}
|
||||
|
||||
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
|
||||
func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true }
|
||||
func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true }
|
||||
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
|
||||
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
|
||||
func (mp *mockPeer) ID() ID { return mp.id }
|
||||
func (mp *mockPeer) IsOutbound() bool { return false }
|
||||
func (mp *mockPeer) IsPersistent() bool { return true }
|
||||
func (mp *mockPeer) Get(s string) interface{} { return s }
|
||||
func (mp *mockPeer) Set(string, interface{}) {}
|
||||
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
|
||||
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
|
||||
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
|
||||
func (mp *mockPeer) CloseConn() error { return nil }
|
||||
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
|
||||
func (mp *mockPeer) TrySend(e Envelope) bool { return true }
|
||||
func (mp *mockPeer) Send(e Envelope) bool { return true }
|
||||
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
|
||||
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
|
||||
func (mp *mockPeer) ID() ID { return mp.id }
|
||||
func (mp *mockPeer) IsOutbound() bool { return false }
|
||||
func (mp *mockPeer) IsPersistent() bool { return true }
|
||||
func (mp *mockPeer) Get(s string) interface{} { return s }
|
||||
func (mp *mockPeer) Set(string, interface{}) {}
|
||||
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
|
||||
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
|
||||
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
|
||||
func (mp *mockPeer) CloseConn() error { return nil }
|
||||
func (mp *mockPeer) SetRemovalFailed() {}
|
||||
func (mp *mockPeer) GetRemovalFailed() bool { return false }
|
||||
|
||||
// Returns a mock peer
|
||||
func newMockPeer(ip net.IP) *mockPeer {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/libs/bytes"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/proto/tendermint/p2p"
|
||||
|
||||
"github.com/tendermint/tendermint/config"
|
||||
tmconn "github.com/tendermint/tendermint/p2p/conn"
|
||||
@@ -70,7 +72,7 @@ func TestPeerSend(t *testing.T) {
|
||||
})
|
||||
|
||||
assert.True(p.CanSend(testCh))
|
||||
assert.True(p.Send(testCh, []byte("Asylum")))
|
||||
assert.True(p.Send(Envelope{ChannelID: testCh, Message: &p2p.Message{}}))
|
||||
}
|
||||
|
||||
func createOutboundPeerAndPerformHandshake(
|
||||
@@ -82,6 +84,9 @@ func createOutboundPeerAndPerformHandshake(
|
||||
{ID: testCh, Priority: 1},
|
||||
}
|
||||
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
|
||||
msgTypeByChID := map[byte]proto.Message{
|
||||
testCh: &p2p.Message{},
|
||||
}
|
||||
pk := ed25519.GenPrivKey()
|
||||
pc, err := testOutboundPeerConn(addr, config, false, pk)
|
||||
if err != nil {
|
||||
@@ -94,7 +99,7 @@ func createOutboundPeerAndPerformHandshake(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, chDescs, func(p Peer, r interface{}) {})
|
||||
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {})
|
||||
p.SetLogger(log.TestingLogger().With("peer", addr))
|
||||
return p, nil
|
||||
}
|
||||
|
||||
@@ -6,8 +6,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/cmap"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
@@ -184,6 +182,7 @@ func (r *Reactor) GetChannels() []*conn.ChannelDescriptor {
|
||||
Priority: 1,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
MessageType: &tmp2p.Message{},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -236,16 +235,10 @@ func (r *Reactor) logErrAddrBook(err error) {
|
||||
}
|
||||
|
||||
// Receive implements Reactor by handling incoming PEX messages.
|
||||
func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
|
||||
msg, err := decodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
r.Switch.StopPeerForError(src, err)
|
||||
return
|
||||
}
|
||||
r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
|
||||
func (r *Reactor) Receive(e p2p.Envelope) {
|
||||
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
|
||||
|
||||
switch msg := msg.(type) {
|
||||
switch msg := e.Message.(type) {
|
||||
case *tmp2p.PexRequest:
|
||||
|
||||
// NOTE: this is a prime candidate for amplification attacks,
|
||||
@@ -255,8 +248,8 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
|
||||
|
||||
// If we're a seed and this is an inbound peer,
|
||||
// respond once and disconnect.
|
||||
if r.config.SeedMode && !src.IsOutbound() {
|
||||
id := string(src.ID())
|
||||
if r.config.SeedMode && !e.Src.IsOutbound() {
|
||||
id := string(e.Src.ID())
|
||||
v := r.lastReceivedRequests.Get(id)
|
||||
if v != nil {
|
||||
// FlushStop/StopPeer are already
|
||||
@@ -266,36 +259,36 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
|
||||
r.lastReceivedRequests.Set(id, time.Now())
|
||||
|
||||
// Send addrs and disconnect
|
||||
r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
|
||||
r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
|
||||
go func() {
|
||||
// In a go-routine so it doesn't block .Receive.
|
||||
src.FlushStop()
|
||||
r.Switch.StopPeerGracefully(src)
|
||||
e.Src.FlushStop()
|
||||
r.Switch.StopPeerGracefully(e.Src)
|
||||
}()
|
||||
|
||||
} else {
|
||||
// Check we're not receiving requests too frequently.
|
||||
if err := r.receiveRequest(src); err != nil {
|
||||
r.Switch.StopPeerForError(src, err)
|
||||
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
|
||||
if err := r.receiveRequest(e.Src); err != nil {
|
||||
r.Switch.StopPeerForError(e.Src, err)
|
||||
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
|
||||
return
|
||||
}
|
||||
r.SendAddrs(src, r.book.GetSelection())
|
||||
r.SendAddrs(e.Src, r.book.GetSelection())
|
||||
}
|
||||
|
||||
case *tmp2p.PexAddrs:
|
||||
// If we asked for addresses, add them to the book
|
||||
addrs, err := p2p.NetAddressesFromProto(msg.Addrs)
|
||||
if err != nil {
|
||||
r.Switch.StopPeerForError(src, err)
|
||||
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
|
||||
r.Switch.StopPeerForError(e.Src, err)
|
||||
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
|
||||
return
|
||||
}
|
||||
err = r.ReceiveAddrs(addrs, src)
|
||||
err = r.ReceiveAddrs(addrs, e.Src)
|
||||
if err != nil {
|
||||
r.Switch.StopPeerForError(src, err)
|
||||
r.Switch.StopPeerForError(e.Src, err)
|
||||
if err == ErrUnsolicitedList {
|
||||
r.book.MarkBad(src.SocketAddr(), defaultBanTime)
|
||||
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -348,7 +341,10 @@ func (r *Reactor) RequestAddrs(p Peer) {
|
||||
}
|
||||
r.Logger.Debug("Request addrs", "from", p)
|
||||
r.requestsSent.Set(id, struct{}{})
|
||||
p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{}))
|
||||
p.Send(p2p.Envelope{
|
||||
ChannelID: PexChannel,
|
||||
Message: &tmp2p.PexRequest{},
|
||||
})
|
||||
}
|
||||
|
||||
// ReceiveAddrs adds the given addrs to the addrbook if theres an open
|
||||
@@ -406,7 +402,11 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
|
||||
|
||||
// SendAddrs sends addrs to the peer.
|
||||
func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
|
||||
p.Send(PexChannel, mustEncode(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}))
|
||||
e := p2p.Envelope{
|
||||
ChannelID: PexChannel,
|
||||
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
|
||||
}
|
||||
p.Send(e)
|
||||
}
|
||||
|
||||
// SetEnsurePeersPeriod sets period to ensure peers connected.
|
||||
@@ -763,43 +763,3 @@ func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
|
||||
book.MarkAttempt(addr)
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
// mustEncode proto encodes a tmp2p.Message
|
||||
func mustEncode(pb proto.Message) []byte {
|
||||
msg := tmp2p.Message{}
|
||||
switch pb := pb.(type) {
|
||||
case *tmp2p.PexRequest:
|
||||
msg.Sum = &tmp2p.Message_PexRequest{PexRequest: pb}
|
||||
case *tmp2p.PexAddrs:
|
||||
msg.Sum = &tmp2p.Message_PexAddrs{PexAddrs: pb}
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown message type %T", pb))
|
||||
}
|
||||
|
||||
bz, err := msg.Marshal()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
|
||||
}
|
||||
return bz
|
||||
}
|
||||
|
||||
func decodeMsg(bz []byte) (proto.Message, error) {
|
||||
pb := &tmp2p.Message{}
|
||||
|
||||
err := pb.Unmarshal(bz)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch msg := pb.Sum.(type) {
|
||||
case *tmp2p.Message_PexRequest:
|
||||
return msg.PexRequest, nil
|
||||
case *tmp2p.Message_PexAddrs:
|
||||
return msg.PexAddrs, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,12 +131,11 @@ func TestPEXReactorReceive(t *testing.T) {
|
||||
r.RequestAddrs(peer)
|
||||
|
||||
size := book.Size()
|
||||
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
assert.Equal(t, size+1, book.Size())
|
||||
|
||||
msg = mustEncode(&tmp2p.PexRequest{})
|
||||
r.Receive(PexChannel, peer, msg) // should not panic.
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
}
|
||||
|
||||
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
|
||||
@@ -155,20 +154,19 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
|
||||
require.True(t, book.HasAddress(peerAddr))
|
||||
|
||||
id := string(peer.ID())
|
||||
msg := mustEncode(&tmp2p.PexRequest{})
|
||||
|
||||
// first time creates the entry
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
assert.True(t, r.lastReceivedRequests.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// next time sets the last time value
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
assert.True(t, r.lastReceivedRequests.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// third time is too many too soon - peer is removed
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
|
||||
assert.False(t, r.lastReceivedRequests.Has(id))
|
||||
assert.False(t, sw.Peers().Has(peer.ID()))
|
||||
assert.True(t, book.IsBanned(peerAddr))
|
||||
@@ -192,15 +190,15 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
|
||||
assert.True(t, r.requestsSent.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
|
||||
// receive some addrs. should clear the request
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
assert.False(t, r.requestsSent.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
// receiving more unsolicited addrs causes a disconnect and ban
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
|
||||
assert.False(t, sw.Peers().Has(peer.ID()))
|
||||
assert.True(t, book.IsBanned(peer.SocketAddr()))
|
||||
}
|
||||
@@ -486,8 +484,12 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
|
||||
pexR.RequestAddrs(peer)
|
||||
|
||||
size := book.Size()
|
||||
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
|
||||
pexR.Receive(PexChannel, peer, msg)
|
||||
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
|
||||
pexR.Receive(p2p.Envelope{
|
||||
ChannelID: PexChannel,
|
||||
Src: peer,
|
||||
Message: msg,
|
||||
})
|
||||
assert.Equal(t, size, book.Size())
|
||||
|
||||
pexR.AddPeer(peer)
|
||||
@@ -695,7 +697,10 @@ func TestPexVectors(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
|
||||
bz := mustEncode(tc.msg)
|
||||
w, err := tc.msg.(p2p.Wrapper).Wrap()
|
||||
require.NoError(t, err)
|
||||
bz, err := proto.Marshal(w)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
|
||||
}
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/cmap"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
@@ -69,16 +69,17 @@ type PeerFilterFunc func(IPeerSet, Peer) error
|
||||
type Switch struct {
|
||||
service.BaseService
|
||||
|
||||
config *config.P2PConfig
|
||||
reactors map[string]Reactor
|
||||
chDescs []*conn.ChannelDescriptor
|
||||
reactorsByCh map[byte]Reactor
|
||||
peers *PeerSet
|
||||
dialing *cmap.CMap
|
||||
reconnecting *cmap.CMap
|
||||
nodeInfo NodeInfo // our node info
|
||||
nodeKey *NodeKey // our node privkey
|
||||
addrBook AddrBook
|
||||
config *config.P2PConfig
|
||||
reactors map[string]Reactor
|
||||
chDescs []*conn.ChannelDescriptor
|
||||
reactorsByCh map[byte]Reactor
|
||||
msgTypeByChID map[byte]proto.Message
|
||||
peers *PeerSet
|
||||
dialing *cmap.CMap
|
||||
reconnecting *cmap.CMap
|
||||
nodeInfo NodeInfo // our node info
|
||||
nodeKey *NodeKey // our node privkey
|
||||
addrBook AddrBook
|
||||
// peers addresses with whom we'll maintain constant connection
|
||||
persistentPeersAddrs []*NetAddress
|
||||
unconditionalPeerIDs map[ID]struct{}
|
||||
@@ -113,6 +114,7 @@ func NewSwitch(
|
||||
reactors: make(map[string]Reactor),
|
||||
chDescs: make([]*conn.ChannelDescriptor, 0),
|
||||
reactorsByCh: make(map[byte]Reactor),
|
||||
msgTypeByChID: make(map[byte]proto.Message),
|
||||
peers: NewPeerSet(),
|
||||
dialing: cmap.NewCMap(),
|
||||
reconnecting: cmap.NewCMap(),
|
||||
@@ -164,6 +166,7 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
|
||||
}
|
||||
sw.chDescs = append(sw.chDescs, chDesc)
|
||||
sw.reactorsByCh[chID] = reactor
|
||||
sw.msgTypeByChID[chID] = chDesc.MessageType
|
||||
}
|
||||
sw.reactors[name] = reactor
|
||||
reactor.SetSwitch(sw)
|
||||
@@ -182,6 +185,7 @@ func (sw *Switch) RemoveReactor(name string, reactor Reactor) {
|
||||
}
|
||||
}
|
||||
delete(sw.reactorsByCh, chDesc.ID)
|
||||
delete(sw.msgTypeByChID, chDesc.ID)
|
||||
}
|
||||
delete(sw.reactors, name)
|
||||
reactor.SetSwitch(nil)
|
||||
@@ -261,8 +265,8 @@ func (sw *Switch) OnStop() {
|
||||
// closed once msg bytes are sent to all peers (or time out).
|
||||
//
|
||||
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
|
||||
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
|
||||
sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", log.NewLazySprintf("%X", msgBytes))
|
||||
func (sw *Switch) Broadcast(e Envelope) chan bool {
|
||||
sw.Logger.Debug("Broadcast", "channel", e.ChannelID)
|
||||
|
||||
peers := sw.peers.List()
|
||||
var wg sync.WaitGroup
|
||||
@@ -272,7 +276,7 @@ func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
|
||||
for _, peer := range peers {
|
||||
go func(p Peer) {
|
||||
defer wg.Done()
|
||||
success := p.Send(chID, msgBytes)
|
||||
success := p.Send(e)
|
||||
successChan <- success
|
||||
}(peer)
|
||||
}
|
||||
@@ -370,6 +374,10 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
|
||||
// https://github.com/tendermint/tendermint/issues/3338
|
||||
if sw.peers.Remove(peer) {
|
||||
sw.metrics.Peers.Add(float64(-1))
|
||||
} else {
|
||||
// Removal of the peer has failed. The function above sets a flag within the peer to mark this.
|
||||
// We keep this message here as information to the developer.
|
||||
sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -619,11 +627,12 @@ func (sw *Switch) IsPeerPersistent(na *NetAddress) bool {
|
||||
func (sw *Switch) acceptRoutine() {
|
||||
for {
|
||||
p, err := sw.transport.Accept(peerConfig{
|
||||
chDescs: sw.chDescs,
|
||||
onPeerError: sw.StopPeerForError,
|
||||
reactorsByCh: sw.reactorsByCh,
|
||||
metrics: sw.metrics,
|
||||
isPersistent: sw.IsPeerPersistent,
|
||||
chDescs: sw.chDescs,
|
||||
onPeerError: sw.StopPeerForError,
|
||||
reactorsByCh: sw.reactorsByCh,
|
||||
msgTypeByChID: sw.msgTypeByChID,
|
||||
metrics: sw.metrics,
|
||||
isPersistent: sw.IsPeerPersistent,
|
||||
})
|
||||
if err != nil {
|
||||
switch err := err.(type) {
|
||||
@@ -722,11 +731,12 @@ func (sw *Switch) addOutboundPeerWithConfig(
|
||||
}
|
||||
|
||||
p, err := sw.transport.Dial(*addr, peerConfig{
|
||||
chDescs: sw.chDescs,
|
||||
onPeerError: sw.StopPeerForError,
|
||||
isPersistent: sw.IsPeerPersistent,
|
||||
reactorsByCh: sw.reactorsByCh,
|
||||
metrics: sw.metrics,
|
||||
chDescs: sw.chDescs,
|
||||
onPeerError: sw.StopPeerForError,
|
||||
isPersistent: sw.IsPeerPersistent,
|
||||
reactorsByCh: sw.reactorsByCh,
|
||||
msgTypeByChID: sw.msgTypeByChID,
|
||||
metrics: sw.metrics,
|
||||
})
|
||||
if err != nil {
|
||||
if e, ok := err.(ErrRejected); ok {
|
||||
@@ -824,6 +834,12 @@ func (sw *Switch) addPeer(p Peer) error {
|
||||
// so that if Receive errors, we will find the peer and remove it.
|
||||
// Add should not err since we already checked peers.Has().
|
||||
if err := sw.peers.Add(p); err != nil {
|
||||
switch err.(type) {
|
||||
case ErrPeerRemoval:
|
||||
sw.Logger.Error("Error starting peer ",
|
||||
" err ", "Peer has already errored and removal was attempted.",
|
||||
"peer", p.ID())
|
||||
}
|
||||
return err
|
||||
}
|
||||
sw.metrics.Peers.Add(float64(1))
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -23,6 +24,8 @@ import (
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
"github.com/tendermint/tendermint/proto/tendermint/p2p"
|
||||
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -36,9 +39,8 @@ func init() {
|
||||
}
|
||||
|
||||
type PeerMessage struct {
|
||||
PeerID ID
|
||||
Bytes []byte
|
||||
Counter int
|
||||
Contents proto.Message
|
||||
Counter int
|
||||
}
|
||||
|
||||
type TestReactor struct {
|
||||
@@ -70,12 +72,12 @@ func (tr *TestReactor) AddPeer(peer Peer) {}
|
||||
|
||||
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
|
||||
|
||||
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
|
||||
func (tr *TestReactor) Receive(e Envelope) {
|
||||
if tr.logMessages {
|
||||
tr.mtx.Lock()
|
||||
defer tr.mtx.Unlock()
|
||||
// fmt.Printf("Received: %X, %X\n", chID, msgBytes)
|
||||
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
|
||||
fmt.Printf("Received: %X, %X\n", e.ChannelID, e.Message)
|
||||
tr.msgsReceived[e.ChannelID] = append(tr.msgsReceived[e.ChannelID], PeerMessage{Contents: e.Message, Counter: tr.msgsCounter})
|
||||
tr.msgsCounter++
|
||||
}
|
||||
}
|
||||
@@ -103,12 +105,12 @@ func initSwitchFunc(i int, sw *Switch) *Switch {
|
||||
|
||||
// Make two reactors of two channels each
|
||||
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
|
||||
{ID: byte(0x00), Priority: 10},
|
||||
{ID: byte(0x01), Priority: 10},
|
||||
{ID: byte(0x00), Priority: 10, MessageType: &p2pproto.Message{}},
|
||||
{ID: byte(0x01), Priority: 10, MessageType: &p2pproto.Message{}},
|
||||
}, true))
|
||||
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
|
||||
{ID: byte(0x02), Priority: 10},
|
||||
{ID: byte(0x03), Priority: 10},
|
||||
{ID: byte(0x02), Priority: 10, MessageType: &p2pproto.Message{}},
|
||||
{ID: byte(0x03), Priority: 10, MessageType: &p2pproto.Message{}},
|
||||
}, true))
|
||||
|
||||
return sw
|
||||
@@ -135,31 +137,47 @@ func TestSwitches(t *testing.T) {
|
||||
}
|
||||
|
||||
// Lets send some messages
|
||||
ch0Msg := []byte("channel zero")
|
||||
ch1Msg := []byte("channel foo")
|
||||
ch2Msg := []byte("channel bar")
|
||||
|
||||
s1.Broadcast(byte(0x00), ch0Msg)
|
||||
s1.Broadcast(byte(0x01), ch1Msg)
|
||||
s1.Broadcast(byte(0x02), ch2Msg)
|
||||
|
||||
ch0Msg := &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "1",
|
||||
},
|
||||
},
|
||||
}
|
||||
ch1Msg := &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "1",
|
||||
},
|
||||
},
|
||||
}
|
||||
ch2Msg := &p2pproto.PexAddrs{
|
||||
Addrs: []p2p.NetAddress{
|
||||
{
|
||||
ID: "2",
|
||||
},
|
||||
},
|
||||
}
|
||||
s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
|
||||
s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
|
||||
s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
|
||||
assertMsgReceivedWithTimeout(t,
|
||||
ch0Msg,
|
||||
byte(0x00),
|
||||
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
|
||||
assertMsgReceivedWithTimeout(t,
|
||||
ch1Msg,
|
||||
byte(0x01),
|
||||
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
|
||||
assertMsgReceivedWithTimeout(t,
|
||||
ch2Msg,
|
||||
byte(0x02),
|
||||
s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||
s2.Reactor("bar").(*TestReactor), 200*time.Millisecond, 5*time.Second)
|
||||
}
|
||||
|
||||
func assertMsgReceivedWithTimeout(
|
||||
t *testing.T,
|
||||
msgBytes []byte,
|
||||
msg proto.Message,
|
||||
channel byte,
|
||||
reactor *TestReactor,
|
||||
checkPeriod,
|
||||
@@ -170,9 +188,13 @@ func assertMsgReceivedWithTimeout(
|
||||
select {
|
||||
case <-ticker.C:
|
||||
msgs := reactor.getMsgs(channel)
|
||||
expectedBytes, err := proto.Marshal(msgs[0].Contents)
|
||||
require.NoError(t, err)
|
||||
gotBytes, err := proto.Marshal(msg)
|
||||
require.NoError(t, err)
|
||||
if len(msgs) > 0 {
|
||||
if !bytes.Equal(msgs[0].Bytes, msgBytes) {
|
||||
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
|
||||
if !bytes.Equal(expectedBytes, gotBytes) {
|
||||
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msg, msgs[0].Counter)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -429,7 +451,10 @@ func TestSwitchStopPeerForError(t *testing.T) {
|
||||
|
||||
// send messages to the peer from sw1
|
||||
p := sw1.Peers().List()[0]
|
||||
p.Send(0x1, []byte("here's a message to send"))
|
||||
p.Send(Envelope{
|
||||
ChannelID: 0x1,
|
||||
Message: &p2p.Message{},
|
||||
})
|
||||
|
||||
// stop sw2. this should cause the p to fail,
|
||||
// which results in calling StopPeerForError internally
|
||||
@@ -824,7 +849,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
|
||||
// Send random message from foo channel to another
|
||||
for i := 0; i < b.N; i++ {
|
||||
chID := byte(i % 4)
|
||||
successChan := s1.Broadcast(chID, []byte("test data"))
|
||||
successChan := s1.Broadcast(Envelope{ChannelID: chID})
|
||||
for s := range successChan {
|
||||
if s {
|
||||
numSuccess++
|
||||
@@ -836,3 +861,16 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
|
||||
|
||||
b.Logf("success: %v, failure: %v", numSuccess, numFailure)
|
||||
}
|
||||
|
||||
func TestSwitchRemovalErr(t *testing.T) {
|
||||
|
||||
sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch {
|
||||
return initSwitchFunc(i, sw)
|
||||
})
|
||||
assert.Equal(t, len(sw1.Peers().List()), 1)
|
||||
p := sw1.Peers().List()[0]
|
||||
|
||||
sw2.StopPeerForError(p, fmt.Errorf("peer should error"))
|
||||
|
||||
assert.Equal(t, sw2.peers.Add(p).Error(), ErrPeerRemoval{}.Error())
|
||||
}
|
||||
|
||||
@@ -149,6 +149,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
|
||||
MConnConfig(sw.config),
|
||||
ni,
|
||||
sw.reactorsByCh,
|
||||
sw.msgTypeByChID,
|
||||
sw.chDescs,
|
||||
sw.StopPeerForError,
|
||||
)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"golang.org/x/net/netutil"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/libs/protoio"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
@@ -47,9 +48,10 @@ type peerConfig struct {
|
||||
// isPersistent allows you to set a function, which, given socket address
|
||||
// (for outbound peers) OR self-reported address (for inbound peers), tells
|
||||
// if the peer is persistent or not.
|
||||
isPersistent func(*NetAddress) bool
|
||||
reactorsByCh map[byte]Reactor
|
||||
metrics *Metrics
|
||||
isPersistent func(*NetAddress) bool
|
||||
reactorsByCh map[byte]Reactor
|
||||
msgTypeByChID map[byte]proto.Message
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// Transport emits and connects to Peers. The implementation of Peer is left to
|
||||
@@ -519,6 +521,7 @@ func (mt *MultiplexTransport) wrapPeer(
|
||||
mt.mConfig,
|
||||
ni,
|
||||
cfg.reactorsByCh,
|
||||
cfg.msgTypeByChID,
|
||||
cfg.chDescs,
|
||||
cfg.onPeerError,
|
||||
PeerMetrics(cfg.metrics),
|
||||
|
||||
32
p2p/types.go
32
p2p/types.go
@@ -1,8 +1,40 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
|
||||
)
|
||||
|
||||
type ChannelDescriptor = conn.ChannelDescriptor
|
||||
type ConnectionStatus = conn.ConnectionStatus
|
||||
|
||||
// Envelope contains a message with sender routing info.
|
||||
type Envelope struct {
|
||||
Src Peer // sender (empty if outbound)
|
||||
Message proto.Message // message payload
|
||||
ChannelID byte
|
||||
}
|
||||
|
||||
// Wrapper is a Protobuf message that can contain a variety of inner messages
|
||||
// (e.g. via oneof fields). If a Channel's message type implements Wrapper, the
|
||||
// Router will automatically wrap outbound messages and unwrap inbound messages,
|
||||
// such that reactors do not have to do this themselves.
|
||||
type Unwrapper interface {
|
||||
proto.Message
|
||||
|
||||
// Unwrap will unwrap the inner message contained in this message.
|
||||
Unwrap() (proto.Message, error)
|
||||
}
|
||||
|
||||
type Wrapper interface {
|
||||
proto.Message
|
||||
|
||||
// Wrap will take the underlying message and wrap it in its wrapper type.
|
||||
Wrap() (proto.Message, error)
|
||||
}
|
||||
|
||||
var (
|
||||
_ Wrapper = &tmp2p.PexRequest{}
|
||||
_ Wrapper = &tmp2p.PexAddrs{}
|
||||
)
|
||||
|
||||
73
proto/tendermint/blocksync/message.go
Normal file
73
proto/tendermint/blocksync/message.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package blocksync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
var _ p2p.Wrapper = &StatusRequest{}
|
||||
var _ p2p.Wrapper = &StatusResponse{}
|
||||
var _ p2p.Wrapper = &NoBlockResponse{}
|
||||
var _ p2p.Wrapper = &BlockResponse{}
|
||||
var _ p2p.Wrapper = &BlockRequest{}
|
||||
|
||||
const (
|
||||
BlockResponseMessagePrefixSize = 4
|
||||
BlockResponseMessageFieldKeySize = 1
|
||||
)
|
||||
|
||||
func (m *BlockRequest) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_BlockRequest{BlockRequest: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func (m *BlockResponse) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_BlockResponse{BlockResponse: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func (m *NoBlockResponse) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_NoBlockResponse{NoBlockResponse: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func (m *StatusRequest) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_StatusRequest{StatusRequest: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
func (m *StatusResponse) Wrap() (proto.Message, error) {
|
||||
bm := &Message{}
|
||||
bm.Sum = &Message_StatusResponse{StatusResponse: m}
|
||||
return bm, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped blockchain
|
||||
// message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_BlockRequest:
|
||||
return m.GetBlockRequest(), nil
|
||||
|
||||
case *Message_BlockResponse:
|
||||
return m.GetBlockResponse(), nil
|
||||
|
||||
case *Message_NoBlockResponse:
|
||||
return m.GetNoBlockResponse(), nil
|
||||
|
||||
case *Message_StatusRequest:
|
||||
return m.GetStatusRequest(), nil
|
||||
|
||||
case *Message_StatusResponse:
|
||||
return m.GetStatusResponse(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
109
proto/tendermint/consensus/message.go
Normal file
109
proto/tendermint/consensus/message.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
var _ p2p.Wrapper = &VoteSetBits{}
|
||||
var _ p2p.Wrapper = &VoteSetMaj23{}
|
||||
var _ p2p.Wrapper = &Vote{}
|
||||
var _ p2p.Wrapper = &ProposalPOL{}
|
||||
var _ p2p.Wrapper = &Proposal{}
|
||||
var _ p2p.Wrapper = &NewValidBlock{}
|
||||
var _ p2p.Wrapper = &NewRoundStep{}
|
||||
var _ p2p.Wrapper = &HasVote{}
|
||||
var _ p2p.Wrapper = &BlockPart{}
|
||||
|
||||
func (m *VoteSetBits) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_VoteSetBits{VoteSetBits: m}
|
||||
return cm, nil
|
||||
|
||||
}
|
||||
|
||||
func (m *VoteSetMaj23) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_VoteSetMaj23{VoteSetMaj23: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *HasVote) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_HasVote{HasVote: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *Vote) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_Vote{Vote: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *BlockPart) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_BlockPart{BlockPart: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *ProposalPOL) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_ProposalPol{ProposalPol: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *Proposal) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_Proposal{Proposal: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *NewValidBlock) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_NewValidBlock{NewValidBlock: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
func (m *NewRoundStep) Wrap() (proto.Message, error) {
|
||||
cm := &Message{}
|
||||
cm.Sum = &Message_NewRoundStep{NewRoundStep: m}
|
||||
return cm, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped consensus
|
||||
// proto message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_NewRoundStep:
|
||||
return m.GetNewRoundStep(), nil
|
||||
|
||||
case *Message_NewValidBlock:
|
||||
return m.GetNewValidBlock(), nil
|
||||
|
||||
case *Message_Proposal:
|
||||
return m.GetProposal(), nil
|
||||
|
||||
case *Message_ProposalPol:
|
||||
return m.GetProposalPol(), nil
|
||||
|
||||
case *Message_BlockPart:
|
||||
return m.GetBlockPart(), nil
|
||||
|
||||
case *Message_Vote:
|
||||
return m.GetVote(), nil
|
||||
|
||||
case *Message_HasVote:
|
||||
return m.GetHasVote(), nil
|
||||
|
||||
case *Message_VoteSetMaj23:
|
||||
return m.GetVoteSetMaj23(), nil
|
||||
|
||||
case *Message_VoteSetBits:
|
||||
return m.GetVoteSetBits(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
30
proto/tendermint/mempool/message.go
Normal file
30
proto/tendermint/mempool/message.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
var _ p2p.Wrapper = &Txs{}
|
||||
var _ p2p.Unwrapper = &Message{}
|
||||
|
||||
// Wrap implements the p2p Wrapper interface and wraps a mempool message.
|
||||
func (m *Txs) Wrap() (proto.Message, error) {
|
||||
return &Message{
|
||||
Sum: &Message_Txs{Txs: m},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped mempool
|
||||
// message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_Txs:
|
||||
return m.GetTxs(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
32
proto/tendermint/p2p/pex.go
Normal file
32
proto/tendermint/p2p/pex.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
)
|
||||
|
||||
func (m *PexAddrs) Wrap() (proto.Message, error) {
|
||||
pm := &Message{}
|
||||
pm.Sum = &Message_PexAddrs{PexAddrs: m}
|
||||
return pm, nil
|
||||
}
|
||||
|
||||
func (m *PexRequest) Wrap() (proto.Message, error) {
|
||||
pm := &Message{}
|
||||
pm.Sum = &Message_PexRequest{PexRequest: m}
|
||||
return pm, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped PEX
|
||||
// message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_PexRequest:
|
||||
return msg.PexRequest, nil
|
||||
case *Message_PexAddrs:
|
||||
return msg.PexAddrs, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown pex message: %T", msg)
|
||||
}
|
||||
}
|
||||
58
proto/tendermint/statesync/message.go
Normal file
58
proto/tendermint/statesync/message.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
var _ p2p.Wrapper = &ChunkRequest{}
|
||||
var _ p2p.Wrapper = &ChunkResponse{}
|
||||
var _ p2p.Wrapper = &SnapshotsRequest{}
|
||||
var _ p2p.Wrapper = &SnapshotsResponse{}
|
||||
|
||||
func (m *SnapshotsResponse) Wrap() (proto.Message, error) {
|
||||
sm := &Message{}
|
||||
sm.Sum = &Message_SnapshotsResponse{SnapshotsResponse: m}
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
func (m *SnapshotsRequest) Wrap() (proto.Message, error) {
|
||||
sm := &Message{}
|
||||
sm.Sum = &Message_SnapshotsRequest{SnapshotsRequest: m}
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
func (m *ChunkResponse) Wrap() (proto.Message, error) {
|
||||
sm := &Message{}
|
||||
sm.Sum = &Message_ChunkResponse{ChunkResponse: m}
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
func (m *ChunkRequest) Wrap() (proto.Message, error) {
|
||||
sm := &Message{}
|
||||
sm.Sum = &Message_ChunkRequest{ChunkRequest: m}
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
// Unwrap implements the p2p Wrapper interface and unwraps a wrapped state sync
|
||||
// proto message.
|
||||
func (m *Message) Unwrap() (proto.Message, error) {
|
||||
switch msg := m.Sum.(type) {
|
||||
case *Message_ChunkRequest:
|
||||
return m.GetChunkRequest(), nil
|
||||
|
||||
case *Message_ChunkResponse:
|
||||
return m.GetChunkResponse(), nil
|
||||
|
||||
case *Message_SnapshotsRequest:
|
||||
return m.GetSnapshotsRequest(), nil
|
||||
|
||||
case *Message_SnapshotsResponse:
|
||||
return m.GetSnapshotsResponse(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
}
|
||||
@@ -131,13 +131,14 @@ object nset = { # the type of node sets
|
||||
object classic_bft = {
|
||||
relation quorum_intersection
|
||||
private {
|
||||
definition [quorum_intersection_def] quorum_intersection = forall Q1,Q2. exists N. well_behaved(N) & nset.member(N, Q1) & nset.member(N, Q2) # every two quorums have a well-behaved node in common
|
||||
definition [quorum_intersection_def] quorum_intersection = forall Q1,Q2. nset.is_quorum(Q1) & nset.is_quorum(Q2)
|
||||
-> exists N. well_behaved(N) & nset.member(N, Q1) & nset.member(N, Q2) # every two quorums have a well-behaved node in common
|
||||
}
|
||||
}
|
||||
|
||||
trusted isolate accountable_bft = {
|
||||
# this is our baseline assumption about quorums:
|
||||
private {
|
||||
property [max_2f_byzantine] exists N . well_behaved(N) & nset.member(N,Q) # every quorum has a well-behaved member
|
||||
property [max_2f_byzantine] nset.is_quorum(Q) -> exists N . well_behaved(N) & nset.member(N,Q) # every quorum has a well-behaved member
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
||||
// Register the Postgres database driver.
|
||||
@@ -196,6 +197,55 @@ func TestIndexing(t *testing.T) {
|
||||
err = indexer.IndexTxEvents([]*abci.TxResult{txResult})
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("IndexerService", func(t *testing.T) {
|
||||
indexer := &EventSink{store: testDB(), chainID: chainID}
|
||||
|
||||
// event bus
|
||||
eventBus := types.NewEventBus()
|
||||
err := eventBus.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := eventBus.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
service := txindex.NewIndexerService(indexer.TxIndexer(), indexer.BlockIndexer(), eventBus, true)
|
||||
err = service.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := service.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
// publish block with txs
|
||||
err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
|
||||
Header: types.Header{Height: 1},
|
||||
NumTxs: int64(2),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
txResult1 := &abci.TxResult{
|
||||
Height: 1,
|
||||
Index: uint32(0),
|
||||
Tx: types.Tx("foo"),
|
||||
Result: abci.ResponseDeliverTx{Code: 0},
|
||||
}
|
||||
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult1})
|
||||
require.NoError(t, err)
|
||||
txResult2 := &abci.TxResult{
|
||||
Height: 1,
|
||||
Index: uint32(1),
|
||||
Tx: types.Tx("bar"),
|
||||
Result: abci.ResponseDeliverTx{Code: 1},
|
||||
}
|
||||
err = eventBus.PublishEventTx(types.EventDataTx{TxResult: *txResult2})
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.True(t, service.IsRunning())
|
||||
})
|
||||
}
|
||||
|
||||
func TestStop(t *testing.T) {
|
||||
|
||||
@@ -3,7 +3,6 @@ package txindex
|
||||
import (
|
||||
"context"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -20,9 +19,10 @@ const (
|
||||
type IndexerService struct {
|
||||
service.BaseService
|
||||
|
||||
txIdxr TxIndexer
|
||||
blockIdxr indexer.BlockIndexer
|
||||
eventBus *types.EventBus
|
||||
txIdxr TxIndexer
|
||||
blockIdxr indexer.BlockIndexer
|
||||
eventBus *types.EventBus
|
||||
terminateOnError bool
|
||||
}
|
||||
|
||||
// NewIndexerService returns a new service instance.
|
||||
@@ -30,9 +30,10 @@ func NewIndexerService(
|
||||
txIdxr TxIndexer,
|
||||
blockIdxr indexer.BlockIndexer,
|
||||
eventBus *types.EventBus,
|
||||
terminateOnError bool,
|
||||
) *IndexerService {
|
||||
|
||||
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus}
|
||||
is := &IndexerService{txIdxr: txIdxr, blockIdxr: blockIdxr, eventBus: eventBus, terminateOnError: terminateOnError}
|
||||
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
|
||||
return is
|
||||
}
|
||||
@@ -74,24 +75,38 @@ func (is *IndexerService) OnStart() error {
|
||||
"index", txResult.Index,
|
||||
"err", err,
|
||||
)
|
||||
|
||||
if is.terminateOnError {
|
||||
if err := is.Stop(); err != nil {
|
||||
is.Logger.Error("failed to stop", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
|
||||
is.Logger.Error("failed to index block", "height", height, "err", err)
|
||||
if is.terminateOnError {
|
||||
if err := is.Stop(); err != nil {
|
||||
is.Logger.Error("failed to stop", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
} else {
|
||||
is.Logger.Info("indexed block", "height", height)
|
||||
}
|
||||
|
||||
batch.Ops, err = DeduplicateBatch(batch.Ops, is.txIdxr)
|
||||
if err != nil {
|
||||
is.Logger.Error("deduplicate batch", "height", height)
|
||||
is.Logger.Info("indexed block exents", "height", height)
|
||||
}
|
||||
|
||||
if err = is.txIdxr.AddBatch(batch); err != nil {
|
||||
is.Logger.Error("failed to index block txs", "height", height, "err", err)
|
||||
if is.terminateOnError {
|
||||
if err := is.Stop(); err != nil {
|
||||
is.Logger.Error("failed to stop", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
} else {
|
||||
is.Logger.Debug("indexed block txs", "height", height, "num_txs", eventDataHeader.NumTxs)
|
||||
is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -104,45 +119,3 @@ func (is *IndexerService) OnStop() {
|
||||
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
// DeduplicateBatch consider the case of duplicate txs.
|
||||
// if the current one under investigation is NOT OK, then we need to check
|
||||
// whether there's a previously indexed tx.
|
||||
// SKIP the current tx if the previously indexed record is found and successful.
|
||||
func DeduplicateBatch(ops []*abci.TxResult, txIdxr TxIndexer) ([]*abci.TxResult, error) {
|
||||
result := make([]*abci.TxResult, 0, len(ops))
|
||||
|
||||
// keep track of successful txs in this block in order to suppress latter ones being indexed.
|
||||
var successfulTxsInThisBlock = make(map[string]struct{})
|
||||
|
||||
for _, txResult := range ops {
|
||||
hash := types.Tx(txResult.Tx).Hash()
|
||||
|
||||
if txResult.Result.IsOK() {
|
||||
successfulTxsInThisBlock[string(hash)] = struct{}{}
|
||||
} else {
|
||||
// if it already appeared in current block and was successful, skip.
|
||||
if _, found := successfulTxsInThisBlock[string(hash)]; found {
|
||||
continue
|
||||
}
|
||||
|
||||
// check if this tx hash is already indexed
|
||||
old, err := txIdxr.Get(hash)
|
||||
|
||||
// if db op errored
|
||||
// Not found is not an error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if it's already indexed in an older block and was successful, skip.
|
||||
if old != nil && old.Result.Code == abci.CodeTypeOK {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
result = append(result, txResult)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
|
||||
txIndexer := kv.NewTxIndex(store)
|
||||
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))
|
||||
|
||||
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
|
||||
service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
|
||||
service.SetLogger(log.TestingLogger())
|
||||
err = service.Start()
|
||||
require.NoError(t, err)
|
||||
@@ -79,164 +79,3 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, txResult2, res)
|
||||
}
|
||||
|
||||
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
|
||||
var mockTx = types.Tx("MOCK_TX_HASH")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tx1 abci.TxResult
|
||||
tx2 abci.TxResult
|
||||
expSkip bool // do we expect the second tx to be skipped by tx indexer
|
||||
}{
|
||||
{"skip, previously successful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
{"not skip, previously unsuccessful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{"not skip, both successful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{"not skip, both unsuccessful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{"skip, same block, previously successful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
{"not skip, same block, previously unsuccessful",
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
indexer := kv.NewTxIndex(db.NewMemDB())
|
||||
|
||||
if tc.tx1.Height != tc.tx2.Height {
|
||||
// index the first tx
|
||||
err := indexer.AddBatch(&txindex.Batch{
|
||||
Ops: []*abci.TxResult{&tc.tx1},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if the second one should be skipped.
|
||||
ops, err := txindex.DeduplicateBatch([]*abci.TxResult{&tc.tx2}, indexer)
|
||||
require.NoError(t, err)
|
||||
|
||||
if tc.expSkip {
|
||||
require.Empty(t, ops)
|
||||
} else {
|
||||
require.Equal(t, []*abci.TxResult{&tc.tx2}, ops)
|
||||
}
|
||||
} else {
|
||||
// same block
|
||||
ops := []*abci.TxResult{&tc.tx1, &tc.tx2}
|
||||
ops, err := txindex.DeduplicateBatch(ops, indexer)
|
||||
require.NoError(t, err)
|
||||
if tc.expSkip {
|
||||
// the second one is skipped
|
||||
require.Equal(t, []*abci.TxResult{&tc.tx1}, ops)
|
||||
} else {
|
||||
require.Equal(t, []*abci.TxResult{&tc.tx1, &tc.tx2}, ops)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,12 +102,30 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
|
||||
// that indexed from the tx's events is a composite of the event type and the
|
||||
// respective attribute's key delimited by a "." (eg. "account.number").
|
||||
// Any event with an empty type is not indexed.
|
||||
//
|
||||
// If a transaction is indexed with the same hash as a previous transaction, it will
|
||||
// be overwritten unless the tx result was NOT OK and the prior result was OK i.e.
|
||||
// more transactions that successfully executed overwrite transactions that failed
|
||||
// or successful yet older transactions.
|
||||
func (txi *TxIndex) Index(result *abci.TxResult) error {
|
||||
b := txi.store.NewBatch()
|
||||
defer b.Close()
|
||||
|
||||
hash := types.Tx(result.Tx).Hash()
|
||||
|
||||
if !result.Result.IsOK() {
|
||||
oldResult, err := txi.Get(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// if the new transaction failed and it's already indexed in an older block and was successful
|
||||
// we skip it as we want users to get the older successful transaction when they query.
|
||||
if oldResult != nil && oldResult.Result.Code == abci.CodeTypeOK {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// index tx by events
|
||||
err := txi.indexEvents(result, hash, b)
|
||||
if err != nil {
|
||||
|
||||
@@ -258,6 +258,103 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxIndexDuplicatePreviouslySuccessful(t *testing.T) {
|
||||
var mockTx = types.Tx("MOCK_TX_HASH")
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
tx1 *abci.TxResult
|
||||
tx2 *abci.TxResult
|
||||
expOverwrite bool // do we expect the second tx to overwrite the first tx
|
||||
}{
|
||||
{
|
||||
"don't overwrite as a non-zero code was returned and the previous tx was successful",
|
||||
&abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
&abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
false,
|
||||
},
|
||||
{
|
||||
"overwrite as the previous tx was also unsuccessful",
|
||||
&abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
&abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK + 1,
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
{
|
||||
"overwrite as the most recent tx was successful",
|
||||
&abci.TxResult{
|
||||
Height: 1,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
&abci.TxResult{
|
||||
Height: 2,
|
||||
Index: 0,
|
||||
Tx: mockTx,
|
||||
Result: abci.ResponseDeliverTx{
|
||||
Code: abci.CodeTypeOK,
|
||||
},
|
||||
},
|
||||
true,
|
||||
},
|
||||
}
|
||||
|
||||
hash := mockTx.Hash()
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
indexer := NewTxIndex(db.NewMemDB())
|
||||
|
||||
// index the first tx
|
||||
err := indexer.Index(tc.tx1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// index the same tx with different results
|
||||
err = indexer.Index(tc.tx2)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := indexer.Get(hash)
|
||||
require.NoError(t, err)
|
||||
|
||||
if tc.expOverwrite {
|
||||
require.Equal(t, tc.tx2, res)
|
||||
} else {
|
||||
require.Equal(t, tc.tx1, res)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxSearchMultipleTxs(t *testing.T) {
|
||||
indexer := NewTxIndex(db.NewMemDB())
|
||||
|
||||
|
||||
@@ -16,49 +16,6 @@ const (
|
||||
chunkMsgSize = int(16e6)
|
||||
)
|
||||
|
||||
// mustEncodeMsg encodes a Protobuf message, panicing on error.
|
||||
func mustEncodeMsg(pb proto.Message) []byte {
|
||||
msg := ssproto.Message{}
|
||||
switch pb := pb.(type) {
|
||||
case *ssproto.ChunkRequest:
|
||||
msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb}
|
||||
case *ssproto.ChunkResponse:
|
||||
msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb}
|
||||
case *ssproto.SnapshotsRequest:
|
||||
msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb}
|
||||
case *ssproto.SnapshotsResponse:
|
||||
msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb}
|
||||
default:
|
||||
panic(fmt.Errorf("unknown message type %T", pb))
|
||||
}
|
||||
bz, err := msg.Marshal()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to marshal %T: %w", pb, err))
|
||||
}
|
||||
return bz
|
||||
}
|
||||
|
||||
// decodeMsg decodes a Protobuf message.
|
||||
func decodeMsg(bz []byte) (proto.Message, error) {
|
||||
pb := &ssproto.Message{}
|
||||
err := proto.Unmarshal(bz, pb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch msg := pb.Sum.(type) {
|
||||
case *ssproto.Message_ChunkRequest:
|
||||
return msg.ChunkRequest, nil
|
||||
case *ssproto.Message_ChunkResponse:
|
||||
return msg.ChunkResponse, nil
|
||||
case *ssproto.Message_SnapshotsRequest:
|
||||
return msg.SnapshotsRequest, nil
|
||||
case *ssproto.Message_SnapshotsResponse:
|
||||
return msg.SnapshotsResponse, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message type %T", msg)
|
||||
}
|
||||
}
|
||||
|
||||
// validateMsg validates a message.
|
||||
func validateMsg(pb proto.Message) error {
|
||||
if pb == nil {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
)
|
||||
@@ -99,8 +100,10 @@ func TestStateSyncVectors(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
tc := tc
|
||||
|
||||
bz := mustEncodeMsg(tc.msg)
|
||||
w, err := tc.msg.(p2p.Wrapper).Wrap()
|
||||
require.NoError(t, err)
|
||||
bz, err := proto.Marshal(w)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)
|
||||
}
|
||||
|
||||
@@ -66,12 +66,14 @@ func (r *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: snapshotMsgSize,
|
||||
MessageType: &ssproto.Message{},
|
||||
},
|
||||
{
|
||||
ID: ChunkChannel,
|
||||
Priority: 3,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: chunkMsgSize,
|
||||
MessageType: &ssproto.Message{},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -100,27 +102,21 @@ func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
}
|
||||
|
||||
// Receive implements p2p.Reactor.
|
||||
func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
func (r *Reactor) Receive(e p2p.Envelope) {
|
||||
if !r.IsRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := decodeMsg(msgBytes)
|
||||
err := validateMsg(e.Message)
|
||||
if err != nil {
|
||||
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
|
||||
r.Switch.StopPeerForError(src, err)
|
||||
return
|
||||
}
|
||||
err = validateMsg(msg)
|
||||
if err != nil {
|
||||
r.Logger.Error("Invalid message", "peer", src, "msg", msg, "err", err)
|
||||
r.Switch.StopPeerForError(src, err)
|
||||
r.Logger.Error("Invalid message", "peer", e.Src, "msg", e.Message, "err", err)
|
||||
r.Switch.StopPeerForError(e.Src, err)
|
||||
return
|
||||
}
|
||||
|
||||
switch chID {
|
||||
switch e.ChannelID {
|
||||
case SnapshotChannel:
|
||||
switch msg := msg.(type) {
|
||||
switch msg := e.Message.(type) {
|
||||
case *ssproto.SnapshotsRequest:
|
||||
snapshots, err := r.recentSnapshots(recentSnapshots)
|
||||
if err != nil {
|
||||
@@ -129,14 +125,17 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
}
|
||||
for _, snapshot := range snapshots {
|
||||
r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
|
||||
"format", snapshot.Format, "peer", src.ID())
|
||||
src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{
|
||||
Height: snapshot.Height,
|
||||
Format: snapshot.Format,
|
||||
Chunks: snapshot.Chunks,
|
||||
Hash: snapshot.Hash,
|
||||
Metadata: snapshot.Metadata,
|
||||
}))
|
||||
"format", snapshot.Format, "peer", e.Src.ID())
|
||||
e.Src.Send(p2p.Envelope{
|
||||
ChannelID: e.ChannelID,
|
||||
Message: &ssproto.SnapshotsResponse{
|
||||
Height: snapshot.Height,
|
||||
Format: snapshot.Format,
|
||||
Chunks: snapshot.Chunks,
|
||||
Hash: snapshot.Hash,
|
||||
Metadata: snapshot.Metadata,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
case *ssproto.SnapshotsResponse:
|
||||
@@ -146,8 +145,8 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
r.Logger.Debug("Received unexpected snapshot, no state sync in progress")
|
||||
return
|
||||
}
|
||||
r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", src.ID())
|
||||
_, err := r.syncer.AddSnapshot(src, &snapshot{
|
||||
r.Logger.Debug("Received snapshot", "height", msg.Height, "format", msg.Format, "peer", e.Src.ID())
|
||||
_, err := r.syncer.AddSnapshot(e.Src, &snapshot{
|
||||
Height: msg.Height,
|
||||
Format: msg.Format,
|
||||
Chunks: msg.Chunks,
|
||||
@@ -157,7 +156,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
// TODO: We may want to consider punishing the peer for certain errors
|
||||
if err != nil {
|
||||
r.Logger.Error("Failed to add snapshot", "height", msg.Height, "format", msg.Format,
|
||||
"peer", src.ID(), "err", err)
|
||||
"peer", e.Src.ID(), "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -166,10 +165,10 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
}
|
||||
|
||||
case ChunkChannel:
|
||||
switch msg := msg.(type) {
|
||||
switch msg := e.Message.(type) {
|
||||
case *ssproto.ChunkRequest:
|
||||
r.Logger.Debug("Received chunk request", "height", msg.Height, "format", msg.Format,
|
||||
"chunk", msg.Index, "peer", src.ID())
|
||||
"chunk", msg.Index, "peer", e.Src.ID())
|
||||
resp, err := r.conn.LoadSnapshotChunkSync(abci.RequestLoadSnapshotChunk{
|
||||
Height: msg.Height,
|
||||
Format: msg.Format,
|
||||
@@ -181,30 +180,33 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
return
|
||||
}
|
||||
r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
|
||||
"chunk", msg.Index, "peer", src.ID())
|
||||
src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{
|
||||
Height: msg.Height,
|
||||
Format: msg.Format,
|
||||
Index: msg.Index,
|
||||
Chunk: resp.Chunk,
|
||||
Missing: resp.Chunk == nil,
|
||||
}))
|
||||
"chunk", msg.Index, "peer", e.Src.ID())
|
||||
e.Src.Send(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Message: &ssproto.ChunkResponse{
|
||||
Height: msg.Height,
|
||||
Format: msg.Format,
|
||||
Index: msg.Index,
|
||||
Chunk: resp.Chunk,
|
||||
Missing: resp.Chunk == nil,
|
||||
},
|
||||
})
|
||||
|
||||
case *ssproto.ChunkResponse:
|
||||
r.mtx.RLock()
|
||||
defer r.mtx.RUnlock()
|
||||
if r.syncer == nil {
|
||||
r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", src.ID())
|
||||
r.Logger.Debug("Received unexpected chunk, no state sync in progress", "peer", e.Src.ID())
|
||||
return
|
||||
}
|
||||
r.Logger.Debug("Received chunk, adding to sync", "height", msg.Height, "format", msg.Format,
|
||||
"chunk", msg.Index, "peer", src.ID())
|
||||
"chunk", msg.Index, "peer", e.Src.ID())
|
||||
_, err := r.syncer.AddChunk(&chunk{
|
||||
Height: msg.Height,
|
||||
Format: msg.Format,
|
||||
Index: msg.Index,
|
||||
Chunk: msg.Chunk,
|
||||
Sender: src.ID(),
|
||||
Sender: e.Src.ID(),
|
||||
})
|
||||
if err != nil {
|
||||
r.Logger.Error("Failed to add chunk", "height", msg.Height, "format", msg.Format,
|
||||
@@ -217,7 +219,7 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
}
|
||||
|
||||
default:
|
||||
r.Logger.Error("Received message on invalid channel %x", chID)
|
||||
r.Logger.Error("Received message on invalid channel %x", e.ChannelID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -269,7 +271,11 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
|
||||
hook := func() {
|
||||
r.Logger.Debug("Requesting snapshots from known peers")
|
||||
// Request snapshots from all currently connected peers
|
||||
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
|
||||
|
||||
r.Switch.NewBroadcast(p2p.Envelope{
|
||||
ChannelID: SnapshotChannel,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
})
|
||||
}
|
||||
|
||||
hook()
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/gogoproto/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -53,10 +54,18 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
|
||||
peer.On("ID").Return(p2p.ID("id"))
|
||||
var response *ssproto.ChunkResponse
|
||||
if tc.expectResponse != nil {
|
||||
peer.On("Send", ChunkChannel, mock.Anything).Run(func(args mock.Arguments) {
|
||||
msg, err := decodeMsg(args[1].([]byte))
|
||||
peer.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == ChunkChannel
|
||||
})).Run(func(args mock.Arguments) {
|
||||
e := args[0].(p2p.Envelope)
|
||||
|
||||
// Marshal to simulate a wire roundtrip.
|
||||
bz, err := proto.Marshal(e.Message)
|
||||
require.NoError(t, err)
|
||||
response = msg.(*ssproto.ChunkResponse)
|
||||
err = proto.Unmarshal(bz, e.Message)
|
||||
require.NoError(t, err)
|
||||
response = e.Message.(*ssproto.ChunkResponse)
|
||||
}).Return(true)
|
||||
}
|
||||
|
||||
@@ -71,7 +80,11 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
r.Receive(ChunkChannel, peer, mustEncodeMsg(tc.request))
|
||||
r.Receive(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Src: peer,
|
||||
Message: tc.request,
|
||||
})
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
assert.Equal(t, tc.expectResponse, response)
|
||||
|
||||
@@ -131,10 +144,18 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
|
||||
peer := &p2pmocks.Peer{}
|
||||
if len(tc.expectResponses) > 0 {
|
||||
peer.On("ID").Return(p2p.ID("id"))
|
||||
peer.On("Send", SnapshotChannel, mock.Anything).Run(func(args mock.Arguments) {
|
||||
msg, err := decodeMsg(args[1].([]byte))
|
||||
peer.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == SnapshotChannel
|
||||
})).Run(func(args mock.Arguments) {
|
||||
e := args[0].(p2p.Envelope)
|
||||
|
||||
// Marshal to simulate a wire roundtrip.
|
||||
bz, err := proto.Marshal(e.Message)
|
||||
require.NoError(t, err)
|
||||
responses = append(responses, msg.(*ssproto.SnapshotsResponse))
|
||||
err = proto.Unmarshal(bz, e.Message)
|
||||
require.NoError(t, err)
|
||||
responses = append(responses, e.Message.(*ssproto.SnapshotsResponse))
|
||||
}).Return(true)
|
||||
}
|
||||
|
||||
@@ -149,7 +170,11 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
|
||||
}
|
||||
})
|
||||
|
||||
r.Receive(SnapshotChannel, peer, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
|
||||
r.Receive(p2p.Envelope{
|
||||
ChannelID: SnapshotChannel,
|
||||
Src: peer,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
})
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
assert.Equal(t, tc.expectResponses, responses)
|
||||
|
||||
|
||||
@@ -126,7 +126,11 @@ func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) {
|
||||
// to discover snapshots, later we may want to do retries and stuff.
|
||||
func (s *syncer) AddPeer(peer p2p.Peer) {
|
||||
s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
|
||||
peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
|
||||
e := p2p.Envelope{
|
||||
ChannelID: SnapshotChannel,
|
||||
Message: &ssproto.SnapshotsRequest{},
|
||||
}
|
||||
peer.Send(e)
|
||||
}
|
||||
|
||||
// RemovePeer removes a peer from the pool.
|
||||
@@ -467,11 +471,14 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
|
||||
}
|
||||
s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
|
||||
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
|
||||
peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{
|
||||
Height: snapshot.Height,
|
||||
Format: snapshot.Format,
|
||||
Index: chunk,
|
||||
}))
|
||||
peer.Send(p2p.Envelope{
|
||||
ChannelID: ChunkChannel,
|
||||
Message: &ssproto.ChunkRequest{
|
||||
Height: snapshot.Height,
|
||||
Format: snapshot.Format,
|
||||
Index: chunk,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// verifyApp verifies the sync, checking the app hash, last block height and app version
|
||||
|
||||
@@ -98,13 +98,27 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
// Adding a couple of peers should trigger snapshot discovery messages
|
||||
peerA := &p2pmocks.Peer{}
|
||||
peerA.On("ID").Return(p2p.ID("a"))
|
||||
peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
|
||||
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
req, ok := e.Message.(*ssproto.SnapshotsRequest)
|
||||
return ok && e.ChannelID == SnapshotChannel && req != nil
|
||||
})).Return(true)
|
||||
syncer.AddPeer(peerA)
|
||||
peerA.AssertExpectations(t)
|
||||
|
||||
peerB := &p2pmocks.Peer{}
|
||||
peerB.On("ID").Return(p2p.ID("b"))
|
||||
peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
|
||||
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
req, ok := e.Message.(*ssproto.SnapshotsRequest)
|
||||
return ok && e.ChannelID == SnapshotChannel && req != nil
|
||||
})).Return(true)
|
||||
syncer.AddPeer(peerB)
|
||||
peerB.AssertExpectations(t)
|
||||
|
||||
@@ -147,9 +161,9 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
chunkRequests := make(map[uint32]int)
|
||||
chunkRequestsMtx := tmsync.Mutex{}
|
||||
onChunkRequest := func(args mock.Arguments) {
|
||||
pb, err := decodeMsg(args[1].([]byte))
|
||||
require.NoError(t, err)
|
||||
msg := pb.(*ssproto.ChunkRequest)
|
||||
e, ok := args[0].(p2p.Envelope)
|
||||
require.True(t, ok)
|
||||
msg := e.Message.(*ssproto.ChunkRequest)
|
||||
require.EqualValues(t, 1, msg.Height)
|
||||
require.EqualValues(t, 1, msg.Format)
|
||||
require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
|
||||
@@ -162,8 +176,14 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
chunkRequests[msg.Index]++
|
||||
chunkRequestsMtx.Unlock()
|
||||
}
|
||||
peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
|
||||
peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
|
||||
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == ChunkChannel
|
||||
})).Maybe().Run(onChunkRequest).Return(true)
|
||||
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
|
||||
e, ok := i.(p2p.Envelope)
|
||||
return ok && e.ChannelID == ChunkChannel
|
||||
})).Maybe().Run(onChunkRequest).Return(true)
|
||||
|
||||
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
|
||||
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from
|
||||
|
||||
@@ -87,7 +87,7 @@ func toCSVRecords(rs []report.Report) [][]string {
|
||||
}
|
||||
res := make([][]string, total+1)
|
||||
|
||||
res[0] = []string{"experiment_id", "duration_ns", "block_time", "connections", "rate", "size"}
|
||||
res[0] = []string{"experiment_id", "block_time", "duration_ns", "tx_hash", "connections", "rate", "size"}
|
||||
offset := 1
|
||||
for _, r := range rs {
|
||||
idStr := r.ID.String()
|
||||
@@ -95,7 +95,7 @@ func toCSVRecords(rs []report.Report) [][]string {
|
||||
rateStr := strconv.FormatInt(int64(r.Rate), 10)
|
||||
sizeStr := strconv.FormatInt(int64(r.Size), 10)
|
||||
for i, v := range r.All {
|
||||
res[offset+i] = []string{idStr, strconv.FormatInt(int64(v.Duration), 10), strconv.FormatInt(v.BlockTime.UnixNano(), 10), connStr, rateStr, sizeStr}
|
||||
res[offset+i] = []string{idStr, strconv.FormatInt(v.BlockTime.UnixNano(), 10), strconv.FormatInt(int64(v.Duration), 10), fmt.Sprintf("%X", v.Hash), connStr, rateStr, sizeStr}
|
||||
}
|
||||
offset += len(r.All)
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package payload
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
)
|
||||
|
||||
const keyPrefix = "a="
|
||||
const maxPayloadSize = 4 * 1024 * 1024
|
||||
|
||||
// NewBytes generates a new payload and returns the encoded representation of
|
||||
// the payload as a slice of bytes. NewBytes uses the fields on the Options
|
||||
@@ -25,10 +26,16 @@ func NewBytes(p *Payload) ([]byte, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p.Size < uint64(us) {
|
||||
return nil, fmt.Errorf("configured size %d not large enough to fit unpadded transaction of size %d", p.Size, us)
|
||||
if p.Size > maxPayloadSize {
|
||||
return nil, fmt.Errorf("configured size %d is too large (>%d)", p.Size, maxPayloadSize)
|
||||
}
|
||||
p.Padding = make([]byte, p.Size-uint64(us))
|
||||
pSize := int(p.Size) // #nosec -- The "if" above makes this cast safe
|
||||
if pSize < us {
|
||||
return nil, fmt.Errorf("configured size %d not large enough to fit unpadded transaction of size %d", pSize, us)
|
||||
}
|
||||
|
||||
// We halve the padding size because we transform the TX to hex
|
||||
p.Padding = make([]byte, (pSize-us)/2)
|
||||
_, err = rand.Read(p.Padding)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -37,22 +44,28 @@ func NewBytes(p *Payload) ([]byte, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h := []byte(hex.EncodeToString(b))
|
||||
|
||||
// prepend a single key so that the kv store only ever stores a single
|
||||
// transaction instead of storing all tx and ballooning in size.
|
||||
return append([]byte(keyPrefix), b...), nil
|
||||
return append([]byte(keyPrefix), h...), nil
|
||||
}
|
||||
|
||||
// FromBytes extracts a paylod from the byte representation of the payload.
|
||||
// FromBytes leaves the padding untouched, returning it to the caller to handle
|
||||
// or discard per their preference.
|
||||
func FromBytes(b []byte) (*Payload, error) {
|
||||
p := &Payload{}
|
||||
tr := bytes.TrimPrefix(b, []byte(keyPrefix))
|
||||
if bytes.Equal(b, tr) {
|
||||
return nil, errors.New("payload bytes missing key prefix")
|
||||
trH := bytes.TrimPrefix(b, []byte(keyPrefix))
|
||||
if bytes.Equal(b, trH) {
|
||||
return nil, fmt.Errorf("payload bytes missing key prefix '%s'", keyPrefix)
|
||||
}
|
||||
err := proto.Unmarshal(tr, p)
|
||||
trB, err := hex.DecodeString(string(trH))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := &Payload{}
|
||||
err = proto.Unmarshal(trB, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -83,5 +96,6 @@ func CalculateUnpaddedSize(p *Payload) (int, error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(b) + len(keyPrefix), nil
|
||||
h := []byte(hex.EncodeToString(b))
|
||||
return len(h) + len(keyPrefix), nil
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ type BlockStore interface {
|
||||
type DataPoint struct {
|
||||
Duration time.Duration
|
||||
BlockTime time.Time
|
||||
Hash []byte
|
||||
}
|
||||
|
||||
// Report contains the data calculated from reading the timestamped transactions
|
||||
@@ -68,7 +69,7 @@ func (rs *Reports) ErrorCount() int {
|
||||
return rs.errorCount
|
||||
}
|
||||
|
||||
func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, conns, rate, size uint64) {
|
||||
func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, hash []byte, conns, rate, size uint64) {
|
||||
r, ok := rs.s[id]
|
||||
if !ok {
|
||||
r = Report{
|
||||
@@ -81,7 +82,7 @@ func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, con
|
||||
}
|
||||
rs.s[id] = r
|
||||
}
|
||||
r.All = append(r.All, DataPoint{Duration: l, BlockTime: bt})
|
||||
r.All = append(r.All, DataPoint{Duration: l, BlockTime: bt, Hash: hash})
|
||||
if l > r.Max {
|
||||
r.Max = l
|
||||
}
|
||||
@@ -123,11 +124,12 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
|
||||
id uuid.UUID
|
||||
l time.Duration
|
||||
bt time.Time
|
||||
hash []byte
|
||||
connections, rate, size uint64
|
||||
err error
|
||||
}
|
||||
type txData struct {
|
||||
tx []byte
|
||||
tx types.Tx
|
||||
bt time.Time
|
||||
}
|
||||
reports := &Reports{
|
||||
@@ -161,6 +163,7 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
|
||||
pdc <- payloadData{
|
||||
l: l,
|
||||
bt: b.bt,
|
||||
hash: b.tx.Hash(),
|
||||
id: uuid.UUID(*idb),
|
||||
connections: p.Connections,
|
||||
rate: p.Rate,
|
||||
@@ -202,7 +205,7 @@ func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
|
||||
reports.addError()
|
||||
continue
|
||||
}
|
||||
reports.addDataPoint(pd.id, pd.l, pd.bt, pd.connections, pd.rate, pd.size)
|
||||
reports.addDataPoint(pd.id, pd.l, pd.bt, pd.hash, pd.connections, pd.rate, pd.size)
|
||||
}
|
||||
reports.calculateAll()
|
||||
return reports, nil
|
||||
|
||||
Reference in New Issue
Block a user