consensus package uses protos directly

This commit is contained in:
William Banfield
2022-10-20 18:21:28 -04:00
parent 9742dac312
commit 5c173cbb3d
5 changed files with 91 additions and 78 deletions

View File

@@ -26,6 +26,7 @@ import (
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/store"
@@ -166,13 +167,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{prevote1}),
Message: &tmcons.Vote{prevote1.ToProto()},
ChannelID: VoteChannel,
})
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{prevote2}),
Message: &tmcons.Vote{prevote2.ToProto()},
ChannelID: VoteChannel,
})
}
@@ -526,23 +527,25 @@ func sendProposalAndParts(
parts *types.PartSet,
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
})
// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
pp, err := part.ToProto()
if err != nil {
panic(err) // TODO: wbanfield better error handling
}
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.BlockPart{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: *pp,
},
})
}
@@ -553,11 +556,11 @@ func sendProposalAndParts(
cs.mtx.Unlock()
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(&VoteMessage{prevote}),
Message: &tmcons.Vote{prevote.ToProto()},
})
peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(&VoteMessage{precommit}),
Message: &tmcons.Vote{precommit.ToProto()},
})
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/p2p"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"github.com/tendermint/tendermint/types"
)
@@ -95,7 +96,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(p2p.Envelope{
Message: MustMsgToProto(&VoteMessage{precommit}),
Message: &tmcons.Vote{precommit.ToProto()},
ChannelID: VoteChannel,
})
}

View File

@@ -296,15 +296,18 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
default:
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
eMsg := &tmcons.VoteSetBits{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID.ToProto(),
}
if votes := ourVotes.ToProto(); votes != nil {
eMsg.Votes = *votes
}
e.Src.TrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: MustMsgToProto(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
}),
Message: eMsg,
})
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
@@ -439,28 +442,28 @@ func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(nrsMsg),
Message: nrsMsg,
})
}
func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
csMsg := &NewValidBlockMessage{
psh := rs.ProposalBlockParts.Header()
csMsg := &tmcons.NewValidBlock{
Height: rs.Height,
Round: rs.Round,
BlockPartSetHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.ProposalBlockParts.BitArray(),
BlockPartSetHeader: psh.ToProto(),
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
MsgToProto(csMsg)
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(csMsg),
Message: csMsg,
})
}
// Broadcasts HasVoteMessage to peers that care.
func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
msg := &HasVoteMessage{
msg := &tmcons.HasVote{
Height: vote.Height,
Round: vote.Round,
Type: vote.Type,
@@ -468,7 +471,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
}
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(msg),
Message: msg,
})
/*
// TODO: Make this broadcast more selective.
@@ -494,11 +497,11 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
*/
}
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *NewRoundStepMessage) {
nrsMsg = &NewRoundStepMessage{
func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep) {
nrsMsg = &tmcons.NewRoundStep{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
Step: uint32(rs.Step),
SecondsSinceStartTime: int64(time.Since(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.GetRound(),
}
@@ -510,7 +513,7 @@ func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
nrsMsg := makeRoundStepMessage(rs)
peer.Send(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(nrsMsg),
Message: nrsMsg,
})
}
@@ -550,15 +553,18 @@ OUTER_LOOP:
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index)
msg := &BlockPartMessage{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: part,
parts, err := part.ToProto()
if err != nil {
panic(err)
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.BlockPart{
Height: rs.Height, // This tells peer that this part applies to us.
Round: rs.Round, // This tells peer that this part applies to us.
Part: *parts,
},
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
@@ -605,11 +611,10 @@ OUTER_LOOP:
if rs.Proposal != nil && !prs.Proposal {
// Proposal: share the proposal metadata with peer.
{
msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
}) {
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
ps.SetHasProposal(rs.Proposal)
@@ -620,15 +625,14 @@ OUTER_LOOP:
// rs.Proposal was validated, so rs.Proposal.POLRound <= rs.Round,
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
msg := &ProposalPOLMessage{
Height: rs.Height,
ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
}
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.ProposalPOL{
Height: rs.Height,
ProposalPolRound: rs.Proposal.POLRound,
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
},
})
}
continue OUTER_LOOP
@@ -666,15 +670,19 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
return
}
// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
pp, err := part.ToProto()
if err != nil {
logger.Error("Could not convert part to proto", "index", index, "error", err)
return
}
if peer.Send(p2p.Envelope{
ChannelID: DataChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.BlockPart{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: *pp,
},
}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else {
@@ -837,12 +845,12 @@ OUTER_LOOP:
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrevoteType,
BlockID: maj23,
}),
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -855,15 +863,14 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: tmproto.PrecommitType,
BlockID: maj23,
}),
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -879,12 +886,12 @@ OUTER_LOOP:
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: tmproto.PrevoteType,
BlockID: maj23,
}),
BlockID: maj23.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -902,12 +909,12 @@ OUTER_LOOP:
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: MustMsgToProto(&VoteSetMaj23Message{
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
Round: commit.Round,
Type: tmproto.PrecommitType,
BlockID: commit.BlockID,
}),
BlockID: commit.BlockID.ToProto(),
},
})
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
@@ -1122,11 +1129,12 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Message: MustMsgToProto(msg),
Message: &tmcons.Vote{
Vote: vote.ToProto(),
},
}) {
ps.SetHasVote(vote)
return true

View File

@@ -33,6 +33,7 @@ import (
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/p2p"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
tmcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
sm "github.com/tendermint/tendermint/state"
statemocks "github.com/tendermint/tendermint/state/mocks"
@@ -265,15 +266,18 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
reactor.InitPeer(peer)
// simulate switch calling Receive before AddPeer
assert.NotPanics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
reactor.AddPeer(peer)
})
}
@@ -288,20 +292,17 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
var (
reactor = reactors[0]
peer = p2pmock.NewPeer(nil)
msg = MustEncode(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType})
)
// we should call InitPeer here
// simulate switch calling Receive before AddPeer
assert.Panics(t, func() {
reactor.Receive(StateChannel, peer, msg)
reactor.NewReceive(p2p.Envelope{
reactor.Receive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: MustMsgToProto(&HasVoteMessage{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType}),
Message: &tmcons.HasVote{Height: 1,
Round: 1, Index: 1, Type: tmproto.PrevoteType},
})
})
}

View File

@@ -19,7 +19,7 @@ func NewReactor() *Reactor {
return r
}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(e p2p.Envelope) {}