all builds except fuzz

This commit is contained in:
William Banfield
2022-10-26 18:20:45 -04:00
parent 79a7d6d660
commit 89482f810c
23 changed files with 160 additions and 135 deletions

View File

@@ -150,7 +150,7 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
@@ -181,21 +181,21 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return false
}
return src.TrySend(p2p.Envelope{
return src.NewTrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
})
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
return src.TrySend(p2p.Envelope{
return src.NewTrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(e p2p.Envelope) {
func (bcR *Reactor) NewReceive(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
@@ -216,7 +216,7 @@ func (bcR *Reactor) Receive(e p2p.Envelope) {
bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size())
case *bcproto.StatusRequest:
// Send peer our state.
e.Src.TrySend(p2p.Envelope{
e.Src.NewTrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Height: bcR.store.Height(),
@@ -268,7 +268,7 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
queued := peer.TrySend(p2p.Envelope{
queued := peer.NewTrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockRequest{Height: request.Height},
})
@@ -410,7 +410,7 @@ FOR_LOOP:
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() {
bcR.Switch.Broadcast(p2p.Envelope{
bcR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusRequest{},
})

View File

@@ -166,13 +166,13 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
for i, peer := range peerList {
if i < len(peerList)/2 {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer)
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
Message: &tmcons.Vote{Vote: prevote1.ToProto()},
ChannelID: VoteChannel,
})
} else {
bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer)
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
Message: &tmcons.Vote{Vote: prevote2.ToProto()},
ChannelID: VoteChannel,
})
@@ -527,7 +527,7 @@ func sendProposalAndParts(
parts *types.PartSet,
) {
// proposal
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *proposal.ToProto()},
})
@@ -539,7 +539,7 @@ func sendProposalAndParts(
if err != nil {
panic(err) // TODO: wbanfield better error handling
}
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: height, // This tells peer that this part applies to us.
@@ -554,11 +554,11 @@ func sendProposalAndParts(
prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{Vote: prevote.ToProto()},
})
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{Vote: precommit.ToProto()},
})
@@ -599,7 +599,7 @@ func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(e p2p.Envelope) {
br.reactor.Receive(e)
func (br *ByzantineReactor) NewReceive(e p2p.Envelope) {
br.reactor.NewReceive(e)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

View File

@@ -95,7 +95,7 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw
peers := sw.Peers().List()
for _, peer := range peers {
cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer)
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
Message: &tmcons.Vote{Vote: precommit.ToProto()},
ChannelID: VoteChannel,
})

View File

@@ -225,7 +225,7 @@ func (conR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *Reactor) Receive(e p2p.Envelope) {
func (conR *Reactor) NewReceive(e p2p.Envelope) {
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID)
return
@@ -302,7 +302,7 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
if votes := ourVotes.ToProto(); votes != nil {
eMsg.Votes = *votes
}
e.Src.TrySend(p2p.Envelope{
e.Src.NewTrySend(p2p.Envelope{
ChannelID: VoteSetBitsChannel,
Message: eMsg,
})
@@ -437,7 +437,7 @@ func (conR *Reactor) unsubscribeFromBroadcastEvents() {
func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg := makeRoundStepMessage(rs)
conR.Switch.Broadcast(p2p.Envelope{
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: nrsMsg,
})
@@ -452,7 +452,7 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
BlockParts: rs.ProposalBlockParts.BitArray().ToProto(),
IsCommit: rs.Step == cstypes.RoundStepCommit,
}
conR.Switch.Broadcast(p2p.Envelope{
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: csMsg,
})
@@ -466,7 +466,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
Type: vote.Type,
Index: vote.ValidatorIndex,
}
conR.Switch.Broadcast(p2p.Envelope{
conR.Switch.NewBroadcast(p2p.Envelope{
ChannelID: StateChannel,
Message: msg,
})
@@ -484,7 +484,7 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
ChannelID: StateChannel, struct{ ConsensusMessage }{msg},
Message: p,
}
peer.TrySend(e)
peer.NewTrySend(e)
} else {
// Height doesn't match
// TODO: check a field, maybe CatchupCommitRound?
@@ -508,7 +508,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *tmcons.NewRoundStep)
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.getRoundState()
nrsMsg := makeRoundStepMessage(rs)
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
ChannelID: StateChannel,
Message: nrsMsg,
})
@@ -555,7 +555,7 @@ OUTER_LOOP:
panic(err)
}
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
if peer.NewSend(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: rs.Height, // This tells peer that this part applies to us.
@@ -609,7 +609,7 @@ OUTER_LOOP:
// Proposal: share the proposal metadata with peer.
{
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(p2p.Envelope{
if peer.NewSend(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.Proposal{Proposal: *rs.Proposal.ToProto()},
}) {
@@ -623,7 +623,7 @@ OUTER_LOOP:
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.ProposalPOL{
Height: rs.Height,
@@ -673,7 +673,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
logger.Error("Could not convert part to proto", "index", index, "error", err)
return
}
if peer.Send(p2p.Envelope{
if peer.NewSend(p2p.Envelope{
ChannelID: DataChannel,
Message: &tmcons.BlockPart{
Height: prs.Height, // Not our height, so it doesn't matter.
@@ -840,7 +840,7 @@ OUTER_LOOP:
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
peer.NewTrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -860,7 +860,7 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
peer.NewTrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -881,7 +881,7 @@ OUTER_LOOP:
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(p2p.Envelope{
peer.NewTrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -904,7 +904,7 @@ OUTER_LOOP:
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
peer.TrySend(p2p.Envelope{
peer.NewTrySend(p2p.Envelope{
ChannelID: StateChannel,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -1127,7 +1127,7 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(p2p.Envelope{
if ps.peer.NewSend(p2p.Envelope{
ChannelID: VoteChannel,
Message: &tmcons.Vote{
Vote: vote.ToProto(),

View File

@@ -272,7 +272,7 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) {
// simulate switch calling Receive before AddPeer
assert.NotPanics(t, func() {
reactor.Receive(p2p.Envelope{
reactor.NewReceive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,
@@ -298,7 +298,7 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) {
// simulate switch calling Receive before AddPeer
assert.Panics(t, func() {
reactor.Receive(p2p.Envelope{
reactor.NewReceive(p2p.Envelope{
ChannelID: StateChannel,
Src: peer,
Message: &tmcons.HasVote{Height: 1,

View File

@@ -68,7 +68,7 @@ func (evR *Reactor) AddPeer(peer p2p.Peer) {
// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *Reactor) Receive(e p2p.Envelope) {
func (evR *Reactor) NewReceive(e p2p.Envelope) {
evis, err := evidenceListFromProto(e.Message)
if err != nil {
evR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
@@ -133,7 +133,7 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
panic(err)
}
success := peer.Send(p2p.Envelope{
success := peer.NewSend(p2p.Envelope{
ChannelID: EvidenceChannel,
Message: evp,
})

View File

@@ -155,7 +155,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
func (memR *Reactor) NewReceive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
@@ -242,7 +242,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// https://github.com/tendermint/tendermint/issues/5796
if _, ok := memTx.senders.Load(peerID); !ok {
success := peer.Send(p2p.Envelope{
success := peer.NewSend(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
})

View File

@@ -283,7 +283,7 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) {
for i := 0; i < mempool.MaxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(p2p.Envelope{
reactor.NewReceive(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Src: peer,
Message: &memproto.Message{}, // This uses the wrong message type on purpose to stop the peer as in an error state in the reactor.

View File

@@ -154,7 +154,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// Receive implements Reactor.
// It adds any received transactions to the mempool.
func (memR *Reactor) Receive(e p2p.Envelope) {
func (memR *Reactor) NewReceive(e p2p.Envelope) {
memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
case *protomem.Txs:
@@ -243,7 +243,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796
if !memTx.HasPeer(peerID) {
success := peer.Send(p2p.Envelope{
success := peer.NewSend(p2p.Envelope{
ChannelID: mempool.MempoolChannel,
Message: &protomem.Txs{Txs: [][]byte{memTx.tx}},
})

View File

@@ -40,7 +40,7 @@ type Reactor interface {
// Receive is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor
Receive(Envelope)
NewReceive(Envelope)
}
//--------------------------------------
@@ -63,5 +63,5 @@ func (br *BaseReactor) SetSwitch(sw *Switch) {
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) Receive(e Envelope) {}
func (*BaseReactor) NewReceive(e Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }

View File

@@ -42,9 +42,9 @@ func NewPeer(ip net.IP) *Peer {
return mp
}
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) TrySend(e p2p.Envelope) bool { return true }
func (mp *Peer) Send(e p2p.Envelope) bool { return true }
func (mp *Peer) FlushStop() { mp.Stop() } //nolint:errcheck //ignore error
func (mp *Peer) NewTrySend(e p2p.Envelope) bool { return true }
func (mp *Peer) NewSend(e p2p.Envelope) bool { return true }
func (mp *Peer) NodeInfo() p2p.NodeInfo {
return p2p.DefaultNodeInfo{
DefaultNodeID: mp.addr.ID,

View File

@@ -22,4 +22,4 @@ func NewReactor() *Reactor {
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(e p2p.Envelope) {}
func (r *Reactor) NewReceive(e p2p.Envelope) {}

View File

@@ -109,6 +109,34 @@ func (_m *Peer) IsRunning() bool {
return r0
}
// NewSend provides a mock function with given fields: _a0
func (_m *Peer) NewSend(_a0 p2p.Envelope) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// NewTrySend provides a mock function with given fields: _a0
func (_m *Peer) NewTrySend(_a0 p2p.Envelope) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// NodeInfo provides a mock function with given fields:
func (_m *Peer) NodeInfo() p2p.NodeInfo {
ret := _m.Called()
@@ -220,20 +248,6 @@ func (_m *Peer) Reset() error {
return r0
}
// Send provides a mock function with given fields: _a0
func (_m *Peer) Send(_a0 p2p.Envelope) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
// Set provides a mock function with given fields: _a0, _a1
func (_m *Peer) Set(_a0 string, _a1 interface{}) {
_m.Called(_a0, _a1)
@@ -316,20 +330,6 @@ func (_m *Peer) String() string {
return r0
}
// TrySend provides a mock function with given fields: _a0
func (_m *Peer) TrySend(_a0 p2p.Envelope) bool {
ret := _m.Called(_a0)
var r0 bool
if rf, ok := ret.Get(0).(func(p2p.Envelope) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
return r0
}
type mockConstructorTestingTNewPeer interface {
mock.TestingT
Cleanup(func())

View File

@@ -37,8 +37,8 @@ type Peer interface {
Status() tmconn.ConnectionStatus
SocketAddr() *NetAddress // actual address of the socket
Send(Envelope) bool
TrySend(Envelope) bool
NewSend(Envelope) bool
NewTrySend(Envelope) bool
Set(string, interface{})
Get(string) interface{}
@@ -196,7 +196,7 @@ func (p *peer) OnStart() error {
}
// FlushStop mimics OnStop but additionally ensures that all successful
// .Send() calls will get flushed before closing the connection.
// NewSend() calls will get flushed before closing the connection.
// NOTE: it is not safe to call this method more than once.
func (p *peer) FlushStop() {
p.metricsTicker.Stop()
@@ -251,22 +251,15 @@ func (p *peer) Status() tmconn.ConnectionStatus {
// Send msg bytes to the channel identified by chID byte. Returns false if the
// send queue is full after timeout, specified by MConnection.
func (p *peer) Send(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.Send)
}
// TrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) TrySend(e Envelope) bool {
return p.send(e.ChannelID, e.Message, p.mconn.TrySend)
}
func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bool) bool {
func (p *peer) NewSend(e Envelope) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(chID) {
} else if !p.hasChannel(e.ChannelID) {
return false
}
msg := e.Message
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
@@ -276,11 +269,43 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
p.Logger.Error("marshaling message to send", "error", err)
return false
}
res := sendFunc(chID, msgBytes)
res := p.mconn.Send(e.ChannelID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
"chID", fmt.Sprintf("%#x", e.ChannelID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}
// NewTrySend msg bytes to the channel identified by chID byte. Immediately returns
// false if the send queue is full.
func (p *peer) NewTrySend(e Envelope) bool {
if !p.IsRunning() {
// see Switch#Broadcast, where we fetch the list of peers and loop over
// them - while we're looping, one peer may be removed and stopped.
return false
} else if !p.hasChannel(e.ChannelID) {
return false
}
msg := e.Message
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
p.Logger.Error("marshaling message to send", "error", err)
return false
}
res := p.mconn.TrySend(e.ChannelID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", e.ChannelID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
@@ -408,7 +433,7 @@ func createMConnection(
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
reactor.Receive(Envelope{
reactor.NewReceive(Envelope{
ChannelID: chID,
Src: p,
Message: msg,

View File

@@ -18,20 +18,20 @@ type mockPeer struct {
id ID
}
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) TrySend(e Envelope) bool { return true }
func (mp *mockPeer) Send(e Envelope) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error
func (mp *mockPeer) NewTrySend(e Envelope) bool { return true }
func (mp *mockPeer) NewSend(e Envelope) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp *mockPeer) ID() ID { return mp.id }
func (mp *mockPeer) IsOutbound() bool { return false }
func (mp *mockPeer) IsPersistent() bool { return true }
func (mp *mockPeer) Get(s string) interface{} { return s }
func (mp *mockPeer) Set(string, interface{}) {}
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
func (mp *mockPeer) CloseConn() error { return nil }
// Returns a mock peer
func newMockPeer(ip net.IP) *mockPeer {

View File

@@ -72,7 +72,7 @@ func TestPeerSend(t *testing.T) {
})
assert.True(p.CanSend(testCh))
assert.True(p.Send(Envelope{ChannelID: testCh, Message: &p2p.Message{}}))
assert.True(p.NewSend(Envelope{ChannelID: testCh, Message: &p2p.Message{}}))
}
func createOutboundPeerAndPerformHandshake(

View File

@@ -235,7 +235,7 @@ func (r *Reactor) logErrAddrBook(err error) {
}
// Receive implements Reactor by handling incoming PEX messages.
func (r *Reactor) Receive(e p2p.Envelope) {
func (r *Reactor) NewReceive(e p2p.Envelope) {
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", e.Message)
switch msg := e.Message.(type) {
@@ -341,7 +341,7 @@ func (r *Reactor) RequestAddrs(p Peer) {
}
r.Logger.Debug("Request addrs", "from", p)
r.requestsSent.Set(id, struct{}{})
p.Send(p2p.Envelope{
p.NewSend(p2p.Envelope{
ChannelID: PexChannel,
Message: &tmp2p.PexRequest{},
})
@@ -406,7 +406,7 @@ func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
ChannelID: PexChannel,
Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)},
}
p.Send(e)
p.NewSend(e)
}
// SetEnsurePeersPeriod sets period to ensure peers connected.

View File

@@ -132,10 +132,10 @@ func TestPEXReactorReceive(t *testing.T) {
size := book.Size()
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.Equal(t, size+1, book.Size())
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
}
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
@@ -156,17 +156,17 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
id := string(peer.ID())
// first time creates the entry
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// next time sets the last time value
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// third time is too many too soon - peer is removed
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: &tmp2p.PexRequest{}})
assert.False(t, r.lastReceivedRequests.Has(id))
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peerAddr))
@@ -193,12 +193,12 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
// receive some addrs. should clear the request
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.False(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// receiving more unsolicited addrs causes a disconnect and ban
r.Receive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: msg})
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peer.SocketAddr()))
}
@@ -485,7 +485,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
size := book.Size()
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
pexR.Receive(p2p.Envelope{
pexR.NewReceive(p2p.Envelope{
ChannelID: PexChannel,
Src: peer,
Message: msg,

View File

@@ -268,7 +268,7 @@ func (sw *Switch) OnStop() {
// closed once msg bytes are sent to all peers (or time out).
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
func (sw *Switch) Broadcast(e Envelope) chan bool {
func (sw *Switch) NewBroadcast(e Envelope) chan bool {
sw.Logger.Debug("Broadcast", "channel", e.ChannelID)
peers := sw.peers.List()
@@ -279,7 +279,7 @@ func (sw *Switch) Broadcast(e Envelope) chan bool {
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := p.Send(e)
success := p.NewSend(e)
successChan <- success
}(peer)
}

View File

@@ -71,7 +71,7 @@ func (tr *TestReactor) AddPeer(peer Peer) {}
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
func (tr *TestReactor) Receive(e Envelope) {
func (tr *TestReactor) NewReceive(e Envelope) {
if tr.logMessages {
tr.mtx.Lock()
defer tr.mtx.Unlock()
@@ -157,9 +157,9 @@ func TestSwitches(t *testing.T) {
},
},
}
s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
s1.NewBroadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
s1.NewBroadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
s1.NewBroadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
assertMsgReceivedWithTimeout(t,
ch0Msg,
byte(0x00),
@@ -450,7 +450,7 @@ func TestSwitchStopPeerForError(t *testing.T) {
// send messages to the peer from sw1
p := sw1.Peers().List()[0]
p.Send(Envelope{
p.NewSend(Envelope{
ChannelID: 0x1,
Message: &p2pproto.Message{},
})
@@ -848,7 +848,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
// Send random message from foo channel to another
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
successChan := s1.Broadcast(Envelope{ChannelID: chID})
successChan := s1.NewBroadcast(Envelope{ChannelID: chID})
for s := range successChan {
if s {
numSuccess++

View File

@@ -102,7 +102,7 @@ func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
}
// Receive implements p2p.Reactor.
func (r *Reactor) Receive(e p2p.Envelope) {
func (r *Reactor) NewReceive(e p2p.Envelope) {
if !r.IsRunning() {
return
}
@@ -126,7 +126,7 @@ func (r *Reactor) Receive(e p2p.Envelope) {
for _, snapshot := range snapshots {
r.Logger.Debug("Advertising snapshot", "height", snapshot.Height,
"format", snapshot.Format, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
e.Src.NewSend(p2p.Envelope{
ChannelID: e.ChannelID,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
@@ -181,7 +181,7 @@ func (r *Reactor) Receive(e p2p.Envelope) {
}
r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format,
"chunk", msg.Index, "peer", e.Src.ID())
e.Src.Send(p2p.Envelope{
e.Src.NewSend(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
@@ -272,7 +272,7 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.Logger.Debug("Requesting snapshots from known peers")
// Request snapshots from all currently connected peers
r.Switch.Broadcast(p2p.Envelope{
r.Switch.NewBroadcast(p2p.Envelope{
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
})

View File

@@ -80,7 +80,7 @@ func TestReactor_Receive_ChunkRequest(t *testing.T) {
}
})
r.Receive(p2p.Envelope{
r.NewReceive(p2p.Envelope{
ChannelID: ChunkChannel,
Src: peer,
Message: tc.request,
@@ -170,7 +170,7 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
}
})
r.Receive(p2p.Envelope{
r.NewReceive(p2p.Envelope{
ChannelID: SnapshotChannel,
Src: peer,
Message: &ssproto.SnapshotsRequest{},

View File

@@ -130,7 +130,7 @@ func (s *syncer) AddPeer(peer p2p.Peer) {
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
}
peer.Send(e)
peer.NewSend(e)
}
// RemovePeer removes a peer from the pool.
@@ -471,7 +471,7 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
}
s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
peer.Send(p2p.Envelope{
peer.NewSend(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkRequest{
Height: snapshot.Height,