tests compile

This commit is contained in:
William Banfield
2022-10-19 12:28:39 -04:00
parent 5795877dbc
commit 01db5dc019
13 changed files with 174 additions and 67 deletions

View File

@@ -149,15 +149,14 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
e := p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
},
}
peer.Send(BlocksyncChannel, msgBytes)
peer.NewSend(e)
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first

View File

@@ -165,10 +165,22 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1}))
p, err := MsgToProto(&VoteMessage{prevote1})
require.NoError(t, err)
e := p2p.Envelope{
Message: p,
ChannelID: VoteChannel,
}
peer.NewSend(e)
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2}))
p, err := MsgToProto(&VoteMessage{prevote2})
require.NoError(t, err)
e := p2p.Envelope{
Message: p,
ChannelID: VoteChannel,
}
peer.NewSend(e)
}
}
} else {
@@ -521,7 +533,15 @@ func sendProposalAndParts(
) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, MustEncode(msg))
p, err := MsgToProto(msg)
if err != nil {
panic(err)
}
e := p2p.Envelope{
ChannelID: DataChannel,
Message: p,
}
peer.NewSend(e)
// parts
for i := 0; i < int(parts.Total()); i++ {
@@ -531,7 +551,15 @@ func sendProposalAndParts(
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(DataChannel, MustEncode(msg))
p, err := MsgToProto(msg)
if err != nil {
panic(err)
}
e := p2p.Envelope{
ChannelID: DataChannel,
Message: p,
}
peer.NewSend(e)
}
// votes
@@ -539,9 +567,24 @@ func sendProposalAndParts(
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote}))
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
p, err = MsgToProto(&VoteMessage{prevote})
if err != nil {
panic(err)
}
e = p2p.Envelope{
ChannelID: VoteChannel,
Message: p,
}
peer.NewSend(e)
p, err = MsgToProto(&VoteMessage{precommit})
if err != nil {
panic(err)
}
e = p2p.Envelope{
ChannelID: VoteChannel,
Message: p,
}
peer.NewSend(e)
}
//----------------------------------------

View File

@@ -3,6 +3,7 @@ package consensus
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
@@ -94,7 +95,13 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit}))
p, err := MsgToProto(&VoteMessage{precommit})
require.NoError(t, err)
e := p2p.Envelope{
Message: p,
ChannelID: VoteChannel,
}
peer.NewSend(e)
}
}()
}

View File

@@ -126,11 +126,15 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
evis := evR.prepareEvidenceMessage(peer, ev)
if len(evis) > 0 {
evR.Logger.Debug("Gossiping evidence to peer", "ev", ev, "peer", peer)
msgBytes, err := encodeMsg(evis)
evp, err := evidenceListToProto(evis)
if err != nil {
panic(err)
}
success := peer.Send(EvidenceChannel, msgBytes)
e := p2p.Envelope{
ChannelID: EvidenceChannel,
Message: evp,
}
success := peer.NewSend(e)
if !success {
time.Sleep(peerRetryMessageIntervalMS * time.Millisecond)
continue
@@ -226,6 +230,23 @@ func encodeMsg(evis []types.Evidence) ([]byte, error) {
return epl.Marshal()
}
// encodemsg takes a array of evidence
// returns the byte encoding of the List Message
func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) {
evi := make([]tmproto.Evidence, len(evis))
for i := 0; i < len(evis); i++ {
ev, err := types.EvidenceToProto(evis[i])
if err != nil {
return nil, err
}
evi[i] = *ev
}
epl := tmproto.EvidenceList{
Evidence: evi,
}
return &epl, nil
}
// decodemsg takes an array of bytes
// returns an array of evidence
func decodeMsg(bz []byte) (evis []types.Evidence, err error) {

View File

@@ -234,18 +234,15 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// https://github.com/tendermint/tendermint/issues/5796
if _, ok := memTx.senders.Load(peerID); !ok {
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
e := p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Message: &protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
success := peer.Send(mempool.MempoolChannel, bz)
success := peer.NewSend(e)
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue

View File

@@ -233,18 +233,17 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796
if !memTx.HasPeer(peerID) {
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
e := p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Message: &protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},
},
},
}
bz, err := msg.Marshal()
if err != nil {
panic(err)
}
success := peer.Send(mempool.MempoolChannel, bz)
success := peer.NewSend(e)
if !success {
time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond)
continue

View File

@@ -21,6 +21,8 @@ type mockPeer struct {
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) NewTrySend(e Envelope) bool { return true }
func (mp *mockPeer) NewSend(e Envelope) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }

View File

@@ -348,7 +348,11 @@ func (r *Reactor) RequestAddrs(p Peer) {
}
r.Logger.Debug("Request addrs", "from", p)
r.requestsSent.Set(id, struct{}{})
p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{}))
e := p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexRequest{},
}
p.NewSend(e)
}
// ReceiveAddrs adds the given addrs to the addrbook if theres an open
@@ -406,7 +410,11 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
// SendAddrs sends addrs to the peer.
func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
p.Send(PexChannel, mustEncode(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}))
e := p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
}
p.NewSend(e)
}
// SetEnsurePeersPeriod sets period to ensure peers connected.

View File

@@ -6,7 +6,6 @@ import (
"sync"
"time"
"github.com/cosmos/gogoproto/proto"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/cmap"
"github.com/tendermint/tendermint/libs/log"
@@ -293,11 +292,7 @@ func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) NewBroadcast(e Envelope) chan bool {
msgBytes, err := proto.Marshal(e.Message)
if err != nil {
panic(err)
}
sw.Logger.Debug("Broadcast", "channel", e.ChannelID, "msgBytes", log.NewLazySprintf("%X", msgBytes))
sw.Logger.Debug("Broadcast", "channel", e.ChannelID)
peers := sw.peers.List()
var wg sync.WaitGroup
@@ -307,7 +302,7 @@ func (sw *Switch) NewBroadcast(e Envelope) chan bool {
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := p.Send(e.ChannelID, msgBytes)
success := p.NewSend(e)
successChan <- success
}(peer)
}

View File

@@ -429,7 +429,10 @@ func TestSwitchStopPeerForError(t *testing.T) {
// send messages to the peer from sw1
p := sw1.Peers().List()[0]
p.Send(0x1, []byte("here's a message to send"))
e := Envelope{
ChannelID: 0x1,
}
p.NewSend(e)
// stop sw2. this should cause the p to fail,
// which results in calling StopPeerForError internally

View File

@@ -38,6 +38,23 @@ func mustEncodeMsg(pb proto.Message) []byte {
return bz
}
func toWrappedProto(pb proto.Message) proto.Message {
msg := ssproto.Message{}
switch pb := pb.(type) {
case *ssproto.ChunkRequest:
msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb}
case *ssproto.ChunkResponse:
msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb}
case *ssproto.SnapshotsRequest:
msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb}
case *ssproto.SnapshotsResponse:
msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb}
default:
panic(fmt.Errorf("unknown message type %T", pb))
}
return &msg
}
// decodeMsg decodes a Protobuf message.
func decodeMsg(bz []byte) (proto.Message, error) {
pb := &ssproto.Message{}

View File

@@ -130,13 +130,17 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
for _, snapshot := range snapshots {
r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
"format", snapshot.Format, "peer", src.ID())
src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
}))
e := p2p.Envelope{
ChannelID: chID,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
},
}
src.NewSend(e)
}
case *ssproto.SnapshotsResponse:
@@ -182,13 +186,17 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
}
r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", src.ID())
src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
}))
e := p2p.Envelope{
ChannelID: ChunkChannel,
Message: toWrappedProto(&ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: resp.Chunk,
Missing: resp.Chunk == nil,
}),
}
src.NewSend(e)
case *ssproto.ChunkResponse:
r.mtx.RLock()

View File

@@ -126,7 +126,11 @@ func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) {
// to discover snapshots, later we may want to do retries and stuff.
func (s *syncer) AddPeer(peer p2p.Peer) {
s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
e := p2p.Envelope{
ChannelID: SnapshotChannel,
Message: toWrappedProto(&ssproto.SnapshotsRequest{}),
}
peer.NewSend(e)
}
// RemovePeer removes a peer from the pool.
@@ -467,11 +471,15 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
}
s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
}))
e := p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
},
}
peer.NewSend(e)
}
// verifyApp verifies the sync, checking the app hash, last block height and app version