mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-04 04:04:00 +00:00
p2p: remove final shims from p2p package (#7136)
This is, perhaps, the trival final piece of #7075 that I've been working on. There's more work to be done: - push more of the setup into the pacakges themselves - move channel-based sending/filtering out of the - simplify the buffering throuhgout the p2p stack.
This commit is contained in:
@@ -17,27 +17,7 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
_ service.Service = (*Reactor)(nil)
|
||||
|
||||
// ChannelShims contains a map of ChannelDescriptorShim objects, where each
|
||||
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
|
||||
// p2p proto.Message the new p2p Channel is responsible for handling.
|
||||
//
|
||||
//
|
||||
// TODO: Remove once p2p refactor is complete.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
ChannelShims = []*p2p.ChannelDescriptor{
|
||||
{
|
||||
ID: BlockSyncChannel,
|
||||
MessageType: new(bcproto.Message),
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 1000,
|
||||
RecvBufferCapacity: 1024,
|
||||
RecvMessageCapacity: MaxMsgSize,
|
||||
},
|
||||
}
|
||||
)
|
||||
var _ service.Service = (*Reactor)(nil)
|
||||
|
||||
const (
|
||||
// BlockSyncChannel is a channel for blocks and status updates
|
||||
@@ -55,6 +35,17 @@ const (
|
||||
syncTimeout = 60 * time.Second
|
||||
)
|
||||
|
||||
func GetChannelDescriptor() *p2p.ChannelDescriptor {
|
||||
return &p2p.ChannelDescriptor{
|
||||
ID: BlockSyncChannel,
|
||||
MessageType: new(bcproto.Message),
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 1000,
|
||||
RecvBufferCapacity: 1024,
|
||||
RecvMessageCapacity: MaxMsgSize,
|
||||
}
|
||||
}
|
||||
|
||||
type consensusReactor interface {
|
||||
// For when we switch from block sync reactor to the consensus
|
||||
// machine.
|
||||
|
||||
@@ -21,15 +21,12 @@ import (
|
||||
var (
|
||||
_ service.Service = (*Reactor)(nil)
|
||||
_ p2p.Wrapper = (*tmcons.Message)(nil)
|
||||
)
|
||||
|
||||
// ChannelShims contains a map of ChannelDescriptorShim objects, where each
|
||||
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
|
||||
// p2p proto.Message the new p2p Channel is responsible for handling.
|
||||
//
|
||||
//
|
||||
// TODO: Remove once p2p refactor is complete.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
ChannelShims = []*p2p.ChannelDescriptor{
|
||||
// GetChannelDescriptor produces an instance of a descriptor for this
|
||||
// package's required channels.
|
||||
func GetChannelDescriptors() []*p2p.ChannelDescriptor {
|
||||
return []*p2p.ChannelDescriptor{
|
||||
{
|
||||
ID: StateChannel,
|
||||
MessageType: new(tmcons.Message),
|
||||
@@ -66,7 +63,7 @@ var (
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const (
|
||||
StateChannel = p2p.ChannelID(0x20)
|
||||
|
||||
@@ -15,26 +15,7 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
_ service.Service = (*Reactor)(nil)
|
||||
|
||||
// ChannelShims contains a map of ChannelDescriptorShim objects, where each
|
||||
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
|
||||
// p2p proto.Message the new p2p Channel is responsible for handling.
|
||||
//
|
||||
//
|
||||
// TODO: Remove once p2p refactor is complete.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
ChannelShims = []*p2p.ChannelDescriptor{
|
||||
{
|
||||
ID: EvidenceChannel,
|
||||
MessageType: new(tmproto.EvidenceList),
|
||||
Priority: 6,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
RecvBufferCapacity: 32,
|
||||
},
|
||||
}
|
||||
)
|
||||
var _ service.Service = (*Reactor)(nil)
|
||||
|
||||
const (
|
||||
EvidenceChannel = p2p.ChannelID(0x38)
|
||||
@@ -48,6 +29,18 @@ const (
|
||||
broadcastEvidenceIntervalS = 10
|
||||
)
|
||||
|
||||
// GetChannelDescriptor produces an instance of a descriptor for this
|
||||
// package's required channels.
|
||||
func GetChannelDescriptor() *p2p.ChannelDescriptor {
|
||||
return &p2p.ChannelDescriptor{
|
||||
ID: EvidenceChannel,
|
||||
MessageType: new(tmproto.EvidenceList),
|
||||
Priority: 6,
|
||||
RecvMessageCapacity: maxMsgSize,
|
||||
RecvBufferCapacity: 32,
|
||||
}
|
||||
}
|
||||
|
||||
// Reactor handles evpool evidence broadcasting amongst peers.
|
||||
type Reactor struct {
|
||||
service.BaseService
|
||||
|
||||
@@ -83,14 +83,9 @@ func NewReactor(
|
||||
return r
|
||||
}
|
||||
|
||||
// GetChannelShims returns a map of ChannelDescriptorShim objects, where each
|
||||
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
|
||||
// p2p proto.Message the new p2p Channel is responsible for handling.
|
||||
//
|
||||
//
|
||||
// TODO: Remove once p2p refactor is complete.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor {
|
||||
// GetChannelDescriptor produces an instance of a descriptor for this
|
||||
// package's required channels.
|
||||
func GetChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor {
|
||||
largestTx := make([]byte, cfg.MaxTxBytes)
|
||||
batchMsg := protomem.Message{
|
||||
Sum: &protomem.Message_Txs{
|
||||
@@ -98,14 +93,12 @@ func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor {
|
||||
},
|
||||
}
|
||||
|
||||
return []*p2p.ChannelDescriptor{
|
||||
{
|
||||
ID: mempool.MempoolChannel,
|
||||
MessageType: new(protomem.Message),
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
return &p2p.ChannelDescriptor{
|
||||
ID: mempool.MempoolChannel,
|
||||
MessageType: new(protomem.Message),
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
RecvBufferCapacity: 128,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint)
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
}
|
||||
|
||||
chDesc := GetChannelShims(config)[0]
|
||||
chDesc := GetChannelDescriptor(config)
|
||||
chDesc.RecvBufferCapacity = int(chBuf)
|
||||
rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc)
|
||||
|
||||
|
||||
@@ -90,14 +90,9 @@ func NewReactor(
|
||||
|
||||
func defaultObservePanic(r interface{}) {}
|
||||
|
||||
// GetChannelShims returns a map of ChannelDescriptorShim objects, where each
|
||||
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
|
||||
// p2p proto.Message the new p2p Channel is responsible for handling.
|
||||
//
|
||||
//
|
||||
// TODO: Remove once p2p refactor is complete.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor {
|
||||
// GetChannelDescriptor produces an instance of a descriptor for this
|
||||
// package's required channels.
|
||||
func GetChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor {
|
||||
largestTx := make([]byte, cfg.MaxTxBytes)
|
||||
batchMsg := protomem.Message{
|
||||
Sum: &protomem.Message_Txs{
|
||||
@@ -105,14 +100,12 @@ func GetChannelShims(cfg *config.MempoolConfig) []*p2p.ChannelDescriptor {
|
||||
},
|
||||
}
|
||||
|
||||
return []*p2p.ChannelDescriptor{
|
||||
{
|
||||
ID: mempool.MempoolChannel,
|
||||
MessageType: new(protomem.Message),
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
return &p2p.ChannelDescriptor{
|
||||
ID: mempool.MempoolChannel,
|
||||
MessageType: new(protomem.Message),
|
||||
Priority: 5,
|
||||
RecvMessageCapacity: batchMsg.Size(),
|
||||
RecvBufferCapacity: 128,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ func setupReactors(t *testing.T, numNodes int, chBuf uint) *reactorTestSuite {
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
}
|
||||
|
||||
chDesc := GetChannelShims(cfg.Mempool)[0]
|
||||
chDesc := GetChannelDescriptor(cfg.Mempool)
|
||||
rts.mempoolChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)
|
||||
|
||||
for nodeID := range rts.network.Nodes {
|
||||
|
||||
@@ -81,8 +81,8 @@ type MConnection struct {
|
||||
recvMonitor *flow.Monitor
|
||||
send chan struct{}
|
||||
pong chan struct{}
|
||||
channels []*Channel
|
||||
channelsIdx map[ChannelID]*Channel
|
||||
channels []*channel
|
||||
channelsIdx map[ChannelID]*channel
|
||||
onReceive receiveCbFunc
|
||||
onError errorCbFunc
|
||||
errored uint32
|
||||
@@ -186,8 +186,8 @@ func NewMConnectionWithConfig(
|
||||
}
|
||||
|
||||
// Create channels
|
||||
var channelsIdx = map[ChannelID]*Channel{}
|
||||
var channels = []*Channel{}
|
||||
var channelsIdx = map[ChannelID]*channel{}
|
||||
var channels = []*channel{}
|
||||
|
||||
for _, desc := range chDescs {
|
||||
channel := newChannel(mconn, *desc)
|
||||
@@ -436,7 +436,7 @@ func (c *MConnection) sendPacketMsg() bool {
|
||||
// Choose a channel to create a PacketMsg from.
|
||||
// The chosen channel will be the one whose recentlySent/priority is the least.
|
||||
var leastRatio float32 = math.MaxFloat32
|
||||
var leastChannel *Channel
|
||||
var leastChannel *channel
|
||||
for _, channel := range c.channels {
|
||||
// If nothing to send, skip this channel
|
||||
if !channel.isSendPending() {
|
||||
@@ -639,9 +639,8 @@ func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: lowercase.
|
||||
// NOTE: not goroutine-safe.
|
||||
type Channel struct {
|
||||
type channel struct {
|
||||
// Exponential moving average.
|
||||
// This field must be accessed atomically.
|
||||
// It is first in the struct to ensure correct alignment.
|
||||
@@ -660,12 +659,12 @@ type Channel struct {
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
|
||||
func newChannel(conn *MConnection, desc ChannelDescriptor) *channel {
|
||||
desc = desc.FillDefaults()
|
||||
if desc.Priority <= 0 {
|
||||
panic("Channel default priority must be a positive integer")
|
||||
}
|
||||
return &Channel{
|
||||
return &channel{
|
||||
conn: conn,
|
||||
desc: desc,
|
||||
sendQueue: make(chan []byte, desc.SendQueueCapacity),
|
||||
@@ -674,14 +673,14 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *Channel) SetLogger(l log.Logger) {
|
||||
func (ch *channel) SetLogger(l log.Logger) {
|
||||
ch.Logger = l
|
||||
}
|
||||
|
||||
// Queues message to send to this channel.
|
||||
// Goroutine-safe
|
||||
// Times out (and returns false) after defaultSendTimeout
|
||||
func (ch *Channel) sendBytes(bytes []byte) bool {
|
||||
func (ch *channel) sendBytes(bytes []byte) bool {
|
||||
select {
|
||||
case ch.sendQueue <- bytes:
|
||||
atomic.AddInt32(&ch.sendQueueSize, 1)
|
||||
@@ -694,7 +693,7 @@ func (ch *Channel) sendBytes(bytes []byte) bool {
|
||||
// Returns true if any PacketMsgs are pending to be sent.
|
||||
// Call before calling nextPacketMsg()
|
||||
// Goroutine-safe
|
||||
func (ch *Channel) isSendPending() bool {
|
||||
func (ch *channel) isSendPending() bool {
|
||||
if len(ch.sending) == 0 {
|
||||
if len(ch.sendQueue) == 0 {
|
||||
return false
|
||||
@@ -706,7 +705,7 @@ func (ch *Channel) isSendPending() bool {
|
||||
|
||||
// Creates a new PacketMsg to send.
|
||||
// Not goroutine-safe
|
||||
func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
|
||||
func (ch *channel) nextPacketMsg() tmp2p.PacketMsg {
|
||||
packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)}
|
||||
maxSize := ch.maxPacketMsgPayloadSize
|
||||
packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))]
|
||||
@@ -723,7 +722,7 @@ func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg {
|
||||
|
||||
// Writes next PacketMsg to w and updates c.recentlySent.
|
||||
// Not goroutine-safe
|
||||
func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
|
||||
func (ch *channel) writePacketMsgTo(w io.Writer) (n int, err error) {
|
||||
packet := ch.nextPacketMsg()
|
||||
n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet))
|
||||
atomic.AddInt64(&ch.recentlySent, int64(n))
|
||||
@@ -733,7 +732,7 @@ func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) {
|
||||
// Handles incoming PacketMsgs. It returns a message bytes if message is
|
||||
// complete, which is owned by the caller and will not be modified.
|
||||
// Not goroutine-safe
|
||||
func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
|
||||
func (ch *channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
|
||||
ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
|
||||
var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data)
|
||||
if recvCap < recvReceived {
|
||||
@@ -750,7 +749,7 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
|
||||
|
||||
// Call this periodically to update stats for throttling purposes.
|
||||
// Not goroutine-safe
|
||||
func (ch *Channel) updateStats() {
|
||||
func (ch *channel) updateStats() {
|
||||
// Exponential decay of stats.
|
||||
// TODO: optimize.
|
||||
atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8))
|
||||
|
||||
@@ -28,48 +28,6 @@ import (
|
||||
var (
|
||||
_ service.Service = (*Reactor)(nil)
|
||||
_ p2p.Wrapper = (*ssproto.Message)(nil)
|
||||
|
||||
// ChannelShims contains a map of ChannelDescriptorShim objects, where each
|
||||
// object wraps a reference to a legacy p2p ChannelDescriptor and the corresponding
|
||||
// p2p proto.Message the new p2p Channel is responsible for handling.
|
||||
//
|
||||
//
|
||||
// TODO: Remove once p2p refactor is complete.
|
||||
// ref: https://github.com/tendermint/tendermint/issues/5670
|
||||
ChannelShims = []*p2p.ChannelDescriptor{
|
||||
{
|
||||
ID: SnapshotChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 6,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: snapshotMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
{
|
||||
ID: ChunkChannel,
|
||||
Priority: 3,
|
||||
MessageType: new(ssproto.Message),
|
||||
SendQueueCapacity: 4,
|
||||
RecvMessageCapacity: chunkMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
{
|
||||
ID: LightBlockChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: lightBlockMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
{
|
||||
ID: ParamsChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 2,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: paramMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -113,6 +71,45 @@ const (
|
||||
maxLightBlockRequestRetries = 20
|
||||
)
|
||||
|
||||
func GetChannelDescriptors() []*p2p.ChannelDescriptor {
|
||||
return []*p2p.ChannelDescriptor{
|
||||
{
|
||||
|
||||
ID: SnapshotChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 6,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: snapshotMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
{
|
||||
ID: ChunkChannel,
|
||||
Priority: 3,
|
||||
MessageType: new(ssproto.Message),
|
||||
SendQueueCapacity: 4,
|
||||
RecvMessageCapacity: chunkMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
{
|
||||
ID: LightBlockChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: lightBlockMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
{
|
||||
ID: ParamsChannel,
|
||||
MessageType: new(ssproto.Message),
|
||||
Priority: 2,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: paramMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Metricer defines an interface used for the rpc sync info query, please see statesync.metrics
|
||||
// for the details.
|
||||
type Metricer interface {
|
||||
|
||||
37
node/node.go
37
node/node.go
@@ -303,11 +303,14 @@ func makeNode(cfg *config.Config,
|
||||
sm.BlockExecutorWithMetrics(nodeMetrics.state),
|
||||
)
|
||||
|
||||
csReactor, csState := createConsensusReactor(
|
||||
csReactor, csState, err := createConsensusReactor(
|
||||
cfg, state, blockExec, blockStore, mp, evPool,
|
||||
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
|
||||
peerManager, router, consensusLogger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
// Create the blockchain reactor. Note, we do not start block sync if we're
|
||||
// doing a state sync first.
|
||||
@@ -334,7 +337,17 @@ func makeNode(cfg *config.Config,
|
||||
// we should clean this whole thing up. See:
|
||||
// https://github.com/tendermint/tendermint/issues/4644
|
||||
ssLogger := logger.With("module", "statesync")
|
||||
channels := makeChannelsFromShims(router, statesync.ChannelShims)
|
||||
ssChDesc := statesync.GetChannelDescriptors()
|
||||
channels := make(map[p2p.ChannelID]*p2p.Channel, len(ssChDesc))
|
||||
for idx := range ssChDesc {
|
||||
chd := ssChDesc[idx]
|
||||
ch, err := router.OpenChannel(chd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
channels[ch.ID] = ch
|
||||
}
|
||||
|
||||
peerUpdates := peerManager.Subscribe()
|
||||
stateSyncReactor := statesync.NewReactor(
|
||||
@@ -1088,23 +1101,3 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt
|
||||
|
||||
return opts
|
||||
}
|
||||
|
||||
// FIXME: Temporary helper function, shims should be removed.
|
||||
func makeChannelsFromShims(
|
||||
router *p2p.Router,
|
||||
chDescs []*p2p.ChannelDescriptor,
|
||||
) map[p2p.ChannelID]*p2p.Channel {
|
||||
|
||||
channels := map[p2p.ChannelID]*p2p.Channel{}
|
||||
for idx := range chDescs {
|
||||
chDesc := chDescs[idx]
|
||||
ch, err := router.OpenChannel(chDesc)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to open channel %v: %v", chDesc.ID, err))
|
||||
}
|
||||
|
||||
channels[chDesc.ID] = ch
|
||||
}
|
||||
|
||||
return channels
|
||||
}
|
||||
|
||||
@@ -198,13 +198,15 @@ func createMempoolReactor(
|
||||
) (service.Service, mempool.Mempool, error) {
|
||||
|
||||
logger = logger.With("module", "mempool", "version", cfg.Mempool.Version)
|
||||
channelShims := mempoolv0.GetChannelShims(cfg.Mempool)
|
||||
|
||||
channels := makeChannelsFromShims(router, channelShims)
|
||||
peerUpdates := peerManager.Subscribe()
|
||||
|
||||
switch cfg.Mempool.Version {
|
||||
case config.MempoolV0:
|
||||
ch, err := router.OpenChannel(mempoolv0.GetChannelDescriptor(cfg.Mempool))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
mp := mempoolv0.NewCListMempool(
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
@@ -221,7 +223,7 @@ func createMempoolReactor(
|
||||
cfg.Mempool,
|
||||
peerManager,
|
||||
mp,
|
||||
channels[mempool.MempoolChannel],
|
||||
ch,
|
||||
peerUpdates,
|
||||
)
|
||||
|
||||
@@ -232,6 +234,11 @@ func createMempoolReactor(
|
||||
return reactor, mp, nil
|
||||
|
||||
case config.MempoolV1:
|
||||
ch, err := router.OpenChannel(mempoolv1.GetChannelDescriptor(cfg.Mempool))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
mp := mempoolv1.NewTxMempool(
|
||||
logger,
|
||||
cfg.Mempool,
|
||||
@@ -247,7 +254,7 @@ func createMempoolReactor(
|
||||
cfg.Mempool,
|
||||
peerManager,
|
||||
mp,
|
||||
channels[mempool.MempoolChannel],
|
||||
ch,
|
||||
peerUpdates,
|
||||
)
|
||||
|
||||
@@ -283,9 +290,14 @@ func createEvidenceReactor(
|
||||
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
|
||||
}
|
||||
|
||||
ch, err := router.OpenChannel(evidence.GetChannelDescriptor())
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("creating evidence channel: %w", err)
|
||||
}
|
||||
|
||||
evidenceReactor := evidence.NewReactor(
|
||||
logger,
|
||||
makeChannelsFromShims(router, evidence.ChannelShims)[evidence.EvidenceChannel],
|
||||
ch,
|
||||
peerManager.Subscribe(),
|
||||
evidencePool,
|
||||
)
|
||||
@@ -307,12 +319,16 @@ func createBlockchainReactor(
|
||||
|
||||
logger = logger.With("module", "blockchain")
|
||||
|
||||
channels := makeChannelsFromShims(router, blocksync.ChannelShims)
|
||||
ch, err := router.OpenChannel(blocksync.GetChannelDescriptor())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peerUpdates := peerManager.Subscribe()
|
||||
|
||||
reactor, err := blocksync.NewReactor(
|
||||
logger, state.Copy(), blockExec, blockStore, csReactor,
|
||||
channels[blocksync.BlockSyncChannel], peerUpdates, blockSync,
|
||||
ch, peerUpdates, blockSync,
|
||||
metrics,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -336,7 +352,7 @@ func createConsensusReactor(
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
logger log.Logger,
|
||||
) (*consensus.Reactor, *consensus.State) {
|
||||
) (*consensus.Reactor, *consensus.State, error) {
|
||||
|
||||
consensusState := consensus.NewState(
|
||||
cfg.Consensus,
|
||||
@@ -352,13 +368,19 @@ func createConsensusReactor(
|
||||
consensusState.SetPrivValidator(privValidator)
|
||||
}
|
||||
|
||||
var (
|
||||
channels map[p2p.ChannelID]*p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
csChDesc := consensus.GetChannelDescriptors()
|
||||
channels := make(map[p2p.ChannelID]*p2p.Channel, len(csChDesc))
|
||||
for idx := range csChDesc {
|
||||
chd := csChDesc[idx]
|
||||
ch, err := router.OpenChannel(chd)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
channels = makeChannelsFromShims(router, consensus.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
channels[ch.ID] = ch
|
||||
}
|
||||
|
||||
peerUpdates := peerManager.Subscribe()
|
||||
|
||||
reactor := consensus.NewReactor(
|
||||
logger,
|
||||
@@ -376,7 +398,7 @@ func createConsensusReactor(
|
||||
// consensusReactor will set it on consensusState and blockExecutor.
|
||||
reactor.SetEventBus(eventBus)
|
||||
|
||||
return reactor, consensusState
|
||||
return reactor, consensusState, nil
|
||||
}
|
||||
|
||||
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
|
||||
|
||||
Reference in New Issue
Block a user