mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-10 15:07:24 +00:00
s/Manager/Agent/g
This commit is contained in:
@@ -8,7 +8,7 @@ TenderMint - proof of concept
|
||||
* **[mempool](https://github.com/tendermint/tendermint/blob/master/mempool):** Handles the broadcasting of uncommitted transactions.
|
||||
* **[crypto](https://github.com/tendermint/tendermint/blob/master/crypto):** Includes cgo bindings of ed25519.
|
||||
|
||||
### Status
|
||||
### Development Status
|
||||
|
||||
* Mempool *now*
|
||||
* Consensus *complete*
|
||||
@@ -17,3 +17,7 @@ TenderMint - proof of concept
|
||||
* p2p/* *complete*
|
||||
* Ed25519 bindings *complete*
|
||||
* merkle/* *complete*
|
||||
|
||||
### Issues
|
||||
|
||||
* merkle/* does not free old children nodes. Implement something memory-aware that makes merkle/* act like a weakly referenced map.
|
||||
|
||||
@@ -49,7 +49,7 @@ func calcRoundStartTime(round uint16, startTime time.Time) time.Time {
|
||||
roundDurationDelta*(time.Duration((int64(round)*int64(round)-int64(round))/2)))
|
||||
}
|
||||
|
||||
// calcs the current round given startTime of round zero.
|
||||
// calculates the current round given startTime of round zero.
|
||||
// NOTE: round is zero if startTime is in the future.
|
||||
func calcRound(startTime time.Time) uint16 {
|
||||
now := time.Now()
|
||||
@@ -90,14 +90,14 @@ func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time,
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
type ConsensusManager struct {
|
||||
type ConsensusAgent struct {
|
||||
sw *p2p.Switch
|
||||
swEvents chan interface{}
|
||||
quit chan struct{}
|
||||
started uint32
|
||||
stopped uint32
|
||||
|
||||
cs *ConsensusState
|
||||
conS *ConsensusState
|
||||
blockStore *BlockStore
|
||||
doActionCh chan RoundAction
|
||||
|
||||
@@ -109,61 +109,61 @@ type ConsensusManager struct {
|
||||
stagedState *State
|
||||
}
|
||||
|
||||
func NewConsensusManager(sw *p2p.Switch, state *State, blockStore *BlockStore) *ConsensusManager {
|
||||
func NewConsensusAgent(sw *p2p.Switch, state *State, blockStore *BlockStore) *ConsensusAgent {
|
||||
swEvents := make(chan interface{})
|
||||
sw.AddEventListener("ConsensusManager.swEvents", swEvents)
|
||||
cs := NewConsensusState(state)
|
||||
cm := &ConsensusManager{
|
||||
sw.AddEventListener("ConsensusAgent.swEvents", swEvents)
|
||||
conS := NewConsensusState(state)
|
||||
conA := &ConsensusAgent{
|
||||
sw: sw,
|
||||
swEvents: swEvents,
|
||||
quit: make(chan struct{}),
|
||||
|
||||
cs: cs,
|
||||
conS: conS,
|
||||
blockStore: blockStore,
|
||||
doActionCh: make(chan RoundAction, 1),
|
||||
|
||||
state: state,
|
||||
peerStates: make(map[string]*PeerState),
|
||||
}
|
||||
return cm
|
||||
return conA
|
||||
}
|
||||
|
||||
// Sets our private validator account for signing votes.
|
||||
func (cm *ConsensusManager) SetPrivValidator(priv *PrivValidator) {
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
cm.privValidator = priv
|
||||
func (conA *ConsensusAgent) SetPrivValidator(priv *PrivValidator) {
|
||||
conA.mtx.Lock()
|
||||
defer conA.mtx.Unlock()
|
||||
conA.privValidator = priv
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) PrivValidator() *PrivValidator {
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
return cm.privValidator
|
||||
func (conA *ConsensusAgent) PrivValidator() *PrivValidator {
|
||||
conA.mtx.Lock()
|
||||
defer conA.mtx.Unlock()
|
||||
return conA.privValidator
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) Start() {
|
||||
if atomic.CompareAndSwapUint32(&cm.started, 0, 1) {
|
||||
log.Info("Starting ConsensusManager")
|
||||
go cm.switchEventsRoutine()
|
||||
go cm.gossipProposalRoutine()
|
||||
go cm.knownPartsRoutine()
|
||||
go cm.gossipVoteRoutine()
|
||||
go cm.proposeAndVoteRoutine()
|
||||
func (conA *ConsensusAgent) Start() {
|
||||
if atomic.CompareAndSwapUint32(&conA.started, 0, 1) {
|
||||
log.Info("Starting ConsensusAgent")
|
||||
go conA.switchEventsRoutine()
|
||||
go conA.gossipProposalRoutine()
|
||||
go conA.knownPartsRoutine()
|
||||
go conA.gossipVoteRoutine()
|
||||
go conA.proposeAndVoteRoutine()
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) Stop() {
|
||||
if atomic.CompareAndSwapUint32(&cm.stopped, 0, 1) {
|
||||
log.Info("Stopping ConsensusManager")
|
||||
close(cm.quit)
|
||||
close(cm.swEvents)
|
||||
func (conA *ConsensusAgent) Stop() {
|
||||
if atomic.CompareAndSwapUint32(&conA.stopped, 0, 1) {
|
||||
log.Info("Stopping ConsensusAgent")
|
||||
close(conA.quit)
|
||||
close(conA.swEvents)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle peer new/done events
|
||||
func (cm *ConsensusManager) switchEventsRoutine() {
|
||||
func (conA *ConsensusAgent) switchEventsRoutine() {
|
||||
for {
|
||||
swEvent, ok := <-cm.swEvents
|
||||
swEvent, ok := <-conA.swEvents
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
@@ -171,24 +171,24 @@ func (cm *ConsensusManager) switchEventsRoutine() {
|
||||
case p2p.SwitchEventNewPeer:
|
||||
event := swEvent.(p2p.SwitchEventNewPeer)
|
||||
// Create peerState for event.Peer
|
||||
cm.mtx.Lock()
|
||||
cm.peerStates[event.Peer.Key] = NewPeerState(event.Peer)
|
||||
cm.mtx.Unlock()
|
||||
conA.mtx.Lock()
|
||||
conA.peerStates[event.Peer.Key] = NewPeerState(event.Peer)
|
||||
conA.mtx.Unlock()
|
||||
// Share our state with event.Peer
|
||||
// By sending KnownBlockPartsMessage,
|
||||
// we send our height/round + startTime, and known block parts,
|
||||
// which is sufficient for the peer to begin interacting with us.
|
||||
event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage(cm.cs.RoundState()))
|
||||
event.Peer.TrySend(ProposalCh, conA.makeKnownBlockPartsMessage(conA.conS.RoundState()))
|
||||
case p2p.SwitchEventDonePeer:
|
||||
event := swEvent.(p2p.SwitchEventDonePeer)
|
||||
// Delete peerState for event.Peer
|
||||
cm.mtx.Lock()
|
||||
peerState := cm.peerStates[event.Peer.Key]
|
||||
conA.mtx.Lock()
|
||||
peerState := conA.peerStates[event.Peer.Key]
|
||||
if peerState != nil {
|
||||
peerState.Disconnect()
|
||||
delete(cm.peerStates, event.Peer.Key)
|
||||
delete(conA.peerStates, event.Peer.Key)
|
||||
}
|
||||
cm.mtx.Unlock()
|
||||
conA.mtx.Unlock()
|
||||
default:
|
||||
log.Warning("Unhandled switch event type")
|
||||
}
|
||||
@@ -196,7 +196,7 @@ func (cm *ConsensusManager) switchEventsRoutine() {
|
||||
}
|
||||
|
||||
// Like, how large is it and how often can we send it?
|
||||
func (cm *ConsensusManager) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlockPartsMessage {
|
||||
func (conA *ConsensusAgent) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlockPartsMessage {
|
||||
return &KnownBlockPartsMessage{
|
||||
Height: rs.Height,
|
||||
SecondsSinceStartTime: uint32(time.Now().Sub(rs.StartTime).Seconds()),
|
||||
@@ -205,20 +205,20 @@ func (cm *ConsensusManager) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlo
|
||||
}
|
||||
|
||||
// NOTE: may return nil, but (nil).Wants*() returns false.
|
||||
func (cm *ConsensusManager) getPeerState(peer *p2p.Peer) *PeerState {
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
return cm.peerStates[peer.Key]
|
||||
func (conA *ConsensusAgent) getPeerState(peer *p2p.Peer) *PeerState {
|
||||
conA.mtx.Lock()
|
||||
defer conA.mtx.Unlock()
|
||||
return conA.peerStates[peer.Key]
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) gossipProposalRoutine() {
|
||||
func (conA *ConsensusAgent) gossipProposalRoutine() {
|
||||
OUTER_LOOP:
|
||||
for {
|
||||
// Get round state
|
||||
rs := cm.cs.RoundState()
|
||||
rs := conA.conS.RoundState()
|
||||
|
||||
// Receive incoming message on ProposalCh
|
||||
inMsg, ok := cm.sw.Receive(ProposalCh)
|
||||
inMsg, ok := conA.sw.Receive(ProposalCh)
|
||||
if !ok {
|
||||
break OUTER_LOOP // Client has stopped
|
||||
}
|
||||
@@ -243,7 +243,7 @@ OUTER_LOOP:
|
||||
|
||||
// If we are the proposer, then don't do anything else.
|
||||
// We're already sending peers our proposal on another routine.
|
||||
privValidator := cm.PrivValidator()
|
||||
privValidator := conA.PrivValidator()
|
||||
if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id {
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
@@ -258,10 +258,10 @@ OUTER_LOOP:
|
||||
if added {
|
||||
// If peer wants this part, send peer the part
|
||||
// and our new blockParts state.
|
||||
kbpMsg := cm.makeKnownBlockPartsMessage(rs)
|
||||
kbpMsg := conA.makeKnownBlockPartsMessage(rs)
|
||||
partMsg := &BlockPartMessage{BlockPart: msg.BlockPart}
|
||||
for _, peer := range cm.sw.Peers().List() {
|
||||
peerState := cm.getPeerState(peer)
|
||||
for _, peer := range conA.sw.Peers().List() {
|
||||
peerState := conA.getPeerState(peer)
|
||||
if peerState.WantsBlockPart(msg.BlockPart) {
|
||||
peer.TrySend(KnownPartsCh, kbpMsg)
|
||||
peer.TrySend(ProposalCh, partMsg)
|
||||
@@ -277,18 +277,18 @@ OUTER_LOOP:
|
||||
|
||||
default:
|
||||
// Ignore unknown message
|
||||
// cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
|
||||
// conA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) knownPartsRoutine() {
|
||||
func (conA *ConsensusAgent) knownPartsRoutine() {
|
||||
OUTER_LOOP:
|
||||
for {
|
||||
// Receive incoming message on ProposalCh
|
||||
inMsg, ok := cm.sw.Receive(KnownPartsCh)
|
||||
inMsg, ok := conA.sw.Receive(KnownPartsCh)
|
||||
if !ok {
|
||||
break OUTER_LOOP // Client has stopped
|
||||
}
|
||||
@@ -298,10 +298,10 @@ OUTER_LOOP:
|
||||
msg, ok := msg_.(*KnownBlockPartsMessage)
|
||||
if !ok {
|
||||
// Ignore unknown message type
|
||||
// cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
|
||||
// conA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
peerState := cm.getPeerState(inMsg.MConn.Peer)
|
||||
peerState := conA.getPeerState(inMsg.MConn.Peer)
|
||||
if !peerState.IsConnected() {
|
||||
// Peer disconnected before we were able to process.
|
||||
continue OUTER_LOOP
|
||||
@@ -314,27 +314,27 @@ OUTER_LOOP:
|
||||
|
||||
// Signs a vote document and broadcasts it.
|
||||
// hash can be nil to vote "nil"
|
||||
func (cm *ConsensusManager) signAndVote(vote *Vote) error {
|
||||
privValidator := cm.PrivValidator()
|
||||
func (conA *ConsensusAgent) signAndVote(vote *Vote) error {
|
||||
privValidator := conA.PrivValidator()
|
||||
if privValidator != nil {
|
||||
err := privValidator.SignVote(vote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msg := p2p.TypedMessage{msgTypeVote, vote}
|
||||
cm.sw.Broadcast(VoteCh, msg)
|
||||
conA.sw.Broadcast(VoteCh, msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error {
|
||||
func (conA *ConsensusAgent) stageProposal(proposal *BlockPartSet) error {
|
||||
// Already staged?
|
||||
cm.mtx.Lock()
|
||||
if cm.stagedProposal == proposal {
|
||||
cm.mtx.Unlock()
|
||||
conA.mtx.Lock()
|
||||
if conA.stagedProposal == proposal {
|
||||
conA.mtx.Unlock()
|
||||
return nil
|
||||
} else {
|
||||
cm.mtx.Unlock()
|
||||
conA.mtx.Unlock()
|
||||
}
|
||||
|
||||
// Basic validation
|
||||
@@ -348,9 +348,9 @@ func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error {
|
||||
}
|
||||
|
||||
// Create a copy of the state for staging
|
||||
cm.mtx.Lock()
|
||||
stateCopy := cm.state.Copy() // Deep copy the state before staging.
|
||||
cm.mtx.Unlock()
|
||||
conA.mtx.Lock()
|
||||
stateCopy := conA.state.Copy() // Deep copy the state before staging.
|
||||
conA.mtx.Unlock()
|
||||
|
||||
// Commit block onto the copied state.
|
||||
err = stateCopy.CommitBlock(block)
|
||||
@@ -359,15 +359,15 @@ func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error {
|
||||
}
|
||||
|
||||
// Looks good!
|
||||
cm.mtx.Lock()
|
||||
cm.stagedProposal = proposal
|
||||
cm.stagedState = stateCopy
|
||||
cm.mtx.Unlock()
|
||||
conA.mtx.Lock()
|
||||
conA.stagedProposal = proposal
|
||||
conA.stagedState = stateCopy
|
||||
conA.mtx.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Constructs an unsigned proposal
|
||||
func (cm *ConsensusManager) constructProposal(rs *RoundState) (*BlockPartSet, error) {
|
||||
func (conA *ConsensusAgent) constructProposal(rs *RoundState) (*BlockPartSet, error) {
|
||||
// XXX implement, first implement mempool
|
||||
// proposal := block.ToBlockPartSet()
|
||||
return nil, nil
|
||||
@@ -376,12 +376,12 @@ func (cm *ConsensusManager) constructProposal(rs *RoundState) (*BlockPartSet, er
|
||||
// Vote for (or against) the proposal for this round.
|
||||
// Call during transition from RoundStepProposal to RoundStepVote.
|
||||
// We may not have received a full proposal.
|
||||
func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
|
||||
func (conA *ConsensusAgent) voteProposal(rs *RoundState) error {
|
||||
// If we're locked, must vote that.
|
||||
locked := cm.cs.LockedProposal()
|
||||
locked := conA.conS.LockedProposal()
|
||||
if locked != nil {
|
||||
block := locked.Block()
|
||||
err := cm.signAndVote(&Vote{
|
||||
err := conA.signAndVote(&Vote{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Type: VoteTypeBare,
|
||||
@@ -390,10 +390,10 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
|
||||
return err
|
||||
}
|
||||
// Stage proposal
|
||||
err := cm.stageProposal(rs.Proposal)
|
||||
err := conA.stageProposal(rs.Proposal)
|
||||
if err != nil {
|
||||
// Vote for nil, whatever the error.
|
||||
err := cm.signAndVote(&Vote{
|
||||
err := conA.signAndVote(&Vote{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Type: VoteTypeBare,
|
||||
@@ -402,7 +402,7 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
|
||||
return err
|
||||
}
|
||||
// Vote for block.
|
||||
err = cm.signAndVote(&Vote{
|
||||
err = conA.signAndVote(&Vote{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Type: VoteTypeBare,
|
||||
@@ -413,7 +413,7 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
|
||||
|
||||
// Precommit proposal if we see enough votes for it.
|
||||
// Call during transition from RoundStepVote to RoundStepPrecommit.
|
||||
func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
|
||||
func (conA *ConsensusAgent) precommitProposal(rs *RoundState) error {
|
||||
// If we see a 2/3 majority for votes for a block, precommit.
|
||||
|
||||
// TODO: maybe could use commitTime here and avg it with later commitTime?
|
||||
@@ -426,16 +426,16 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
|
||||
|
||||
// If proposal is invalid or unknown, do nothing.
|
||||
// See note on ZombieValidators to see why.
|
||||
if cm.stageProposal(rs.Proposal) != nil {
|
||||
if conA.stageProposal(rs.Proposal) != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lock this proposal.
|
||||
// NOTE: we're unlocking any prior locks.
|
||||
cm.cs.LockProposal(rs.Proposal)
|
||||
conA.conS.LockProposal(rs.Proposal)
|
||||
|
||||
// Send precommit vote.
|
||||
err := cm.signAndVote(&Vote{
|
||||
err := conA.signAndVote(&Vote{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Type: VoteTypePrecommit,
|
||||
@@ -451,7 +451,7 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
|
||||
|
||||
// Commit or unlock.
|
||||
// Call after RoundStepPrecommit, after round has completely expired.
|
||||
func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime time.Time, err error) {
|
||||
func (conA *ConsensusAgent) commitOrUnlockProposal(rs *RoundState) (commitTime time.Time, err error) {
|
||||
// If there exists a 2/3 majority of precommits.
|
||||
// Validate the block and commit.
|
||||
if hash, commitTime, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok {
|
||||
@@ -460,13 +460,13 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime t
|
||||
// do not commit.
|
||||
// TODO If we were just late to receive the block, when
|
||||
// do we actually get it? Document it.
|
||||
if cm.stageProposal(rs.Proposal) != nil {
|
||||
if conA.stageProposal(rs.Proposal) != nil {
|
||||
return time.Time{}, nil
|
||||
}
|
||||
// TODO: Remove?
|
||||
cm.cs.LockProposal(rs.Proposal)
|
||||
conA.conS.LockProposal(rs.Proposal)
|
||||
// Vote commit.
|
||||
err := cm.signAndVote(&Vote{
|
||||
err := conA.signAndVote(&Vote{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Type: VoteTypePrecommit,
|
||||
@@ -476,11 +476,11 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime t
|
||||
return time.Time{}, err
|
||||
}
|
||||
// Commit block.
|
||||
cm.commitProposal(rs.Proposal, commitTime)
|
||||
conA.commitProposal(rs.Proposal, commitTime)
|
||||
return commitTime, nil
|
||||
} else {
|
||||
// Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock.
|
||||
locked := cm.cs.LockedProposal()
|
||||
locked := conA.conS.LockedProposal()
|
||||
if locked != nil {
|
||||
for _, hashOrNil := range rs.RoundPrecommits.OneThirdMajority() {
|
||||
if hashOrNil == nil {
|
||||
@@ -488,7 +488,7 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime t
|
||||
}
|
||||
if !bytes.Equal(hashOrNil, locked.Block().Hash()) {
|
||||
// Unlock our lock.
|
||||
cm.cs.LockProposal(nil)
|
||||
conA.conS.LockProposal(nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -496,50 +496,50 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime t
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) commitProposal(proposal *BlockPartSet, commitTime time.Time) error {
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
func (conA *ConsensusAgent) commitProposal(proposal *BlockPartSet, commitTime time.Time) error {
|
||||
conA.mtx.Lock()
|
||||
defer conA.mtx.Unlock()
|
||||
|
||||
if cm.stagedProposal != proposal {
|
||||
if conA.stagedProposal != proposal {
|
||||
panic("Unexpected stagedProposal.") // Shouldn't happen.
|
||||
}
|
||||
|
||||
// Save to blockStore
|
||||
block, blockParts := proposal.Block(), proposal.BlockParts()
|
||||
err := cm.blockStore.SaveBlockParts(block.Height, blockParts)
|
||||
err := conA.blockStore.SaveBlockParts(block.Height, blockParts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// What was staged becomes committed.
|
||||
cm.state = cm.stagedState
|
||||
cm.state.Save(commitTime)
|
||||
cm.cs.Update(cm.state)
|
||||
cm.stagedProposal = nil
|
||||
cm.stagedState = nil
|
||||
conA.state = conA.stagedState
|
||||
conA.state.Save(commitTime)
|
||||
conA.conS.Update(conA.state)
|
||||
conA.stagedProposal = nil
|
||||
conA.stagedState = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Given a RoundState where we are the proposer,
|
||||
// broadcast rs.proposal to all the peers.
|
||||
func (cm *ConsensusManager) shareProposal(rs *RoundState) {
|
||||
privValidator := cm.PrivValidator()
|
||||
func (conA *ConsensusAgent) shareProposal(rs *RoundState) {
|
||||
privValidator := conA.PrivValidator()
|
||||
proposal := rs.Proposal
|
||||
if privValidator == nil || proposal == nil {
|
||||
return
|
||||
}
|
||||
privValidator.SignProposal(rs.Round, proposal)
|
||||
blockParts := proposal.BlockParts()
|
||||
peers := cm.sw.Peers().List()
|
||||
peers := conA.sw.Peers().List()
|
||||
if len(peers) == 0 {
|
||||
log.Warning("Could not propose: no peers")
|
||||
return
|
||||
}
|
||||
numBlockParts := uint16(len(blockParts))
|
||||
kbpMsg := cm.makeKnownBlockPartsMessage(rs)
|
||||
kbpMsg := conA.makeKnownBlockPartsMessage(rs)
|
||||
for i, peer := range peers {
|
||||
peerState := cm.getPeerState(peer)
|
||||
peerState := conA.getPeerState(peer)
|
||||
if !peerState.IsConnected() {
|
||||
continue // Peer was disconnected.
|
||||
}
|
||||
@@ -554,7 +554,7 @@ func (cm *ConsensusManager) shareProposal(rs *RoundState) {
|
||||
for i := uint16(0); i < numBlockParts; i++ {
|
||||
part := blockParts[(startIndex+i)%numBlockParts]
|
||||
// Ensure round hasn't expired on our end.
|
||||
currentRS := cm.cs.RoundState()
|
||||
currentRS := conA.conS.RoundState()
|
||||
if currentRS != rs {
|
||||
return
|
||||
}
|
||||
@@ -571,14 +571,14 @@ func (cm *ConsensusManager) shareProposal(rs *RoundState) {
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) gossipVoteRoutine() {
|
||||
func (conA *ConsensusAgent) gossipVoteRoutine() {
|
||||
OUTER_LOOP:
|
||||
for {
|
||||
// Get round state
|
||||
rs := cm.cs.RoundState()
|
||||
rs := conA.conS.RoundState()
|
||||
|
||||
// Receive incoming message on VoteCh
|
||||
inMsg, ok := cm.sw.Receive(VoteCh)
|
||||
inMsg, ok := conA.sw.Receive(VoteCh)
|
||||
if !ok {
|
||||
break // Client has stopped
|
||||
}
|
||||
@@ -622,8 +622,8 @@ OUTER_LOOP:
|
||||
}
|
||||
|
||||
// Gossip vote.
|
||||
for _, peer := range cm.sw.Peers().List() {
|
||||
peerState := cm.getPeerState(peer)
|
||||
for _, peer := range conA.sw.Peers().List() {
|
||||
peerState := conA.getPeerState(peer)
|
||||
wantsVote, unsolicited := peerState.WantsVote(vote)
|
||||
if wantsVote {
|
||||
if unsolicited {
|
||||
@@ -641,7 +641,7 @@ OUTER_LOOP:
|
||||
case *VoteRankMessage:
|
||||
msg := msg_.(*VoteRankMessage)
|
||||
|
||||
peerState := cm.getPeerState(inMsg.MConn.Peer)
|
||||
peerState := conA.getPeerState(inMsg.MConn.Peer)
|
||||
if !peerState.IsConnected() {
|
||||
// Peer disconnected before we were able to process.
|
||||
continue OUTER_LOOP
|
||||
@@ -650,7 +650,7 @@ OUTER_LOOP:
|
||||
|
||||
default:
|
||||
// Ignore unknown message
|
||||
// cm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
|
||||
// conA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -666,17 +666,17 @@ type RoundAction struct {
|
||||
// Source of all round state transitions and votes.
|
||||
// It can be preemptively woken up via amessage to
|
||||
// doActionCh.
|
||||
func (cm *ConsensusManager) proposeAndVoteRoutine() {
|
||||
func (conA *ConsensusAgent) proposeAndVoteRoutine() {
|
||||
|
||||
// Figure out when to wake up next (in the absence of other events)
|
||||
setAlarm := func() {
|
||||
if len(cm.doActionCh) > 0 {
|
||||
if len(conA.doActionCh) > 0 {
|
||||
return // Already going to wake up later.
|
||||
}
|
||||
|
||||
// Figure out which height/round/step we're at,
|
||||
// then schedule an action for when it is due.
|
||||
rs := cm.cs.RoundState()
|
||||
rs := conA.conS.RoundState()
|
||||
_, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime)
|
||||
switch rs.Step() {
|
||||
case RoundStepStart:
|
||||
@@ -685,19 +685,19 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() {
|
||||
// startTime is in the future.
|
||||
time.Sleep(time.Duration(-1.0*elapsedRatio) * roundDuration)
|
||||
}
|
||||
cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal}
|
||||
conA.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal}
|
||||
case RoundStepProposal:
|
||||
// Wake up when it's time to vote.
|
||||
time.Sleep(time.Duration(roundDeadlineBare-elapsedRatio) * roundDuration)
|
||||
cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepBareVotes}
|
||||
conA.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepBareVotes}
|
||||
case RoundStepBareVotes:
|
||||
// Wake up when it's time to precommit.
|
||||
time.Sleep(time.Duration(roundDeadlinePrecommit-elapsedRatio) * roundDuration)
|
||||
cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPrecommits}
|
||||
conA.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepPrecommits}
|
||||
case RoundStepPrecommits:
|
||||
// Wake up when the round is over.
|
||||
time.Sleep(time.Duration(1.0-elapsedRatio) * roundDuration)
|
||||
cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepCommitOrUnlock}
|
||||
conA.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepCommitOrUnlock}
|
||||
case RoundStepCommitOrUnlock:
|
||||
// This shouldn't happen.
|
||||
// Before setAlarm() got called,
|
||||
@@ -708,7 +708,7 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() {
|
||||
|
||||
for {
|
||||
func() {
|
||||
roundAction := <-cm.doActionCh
|
||||
roundAction := <-conA.doActionCh
|
||||
// Always set the alarm after any processing below.
|
||||
defer setAlarm()
|
||||
|
||||
@@ -718,21 +718,21 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() {
|
||||
// We only consider transitioning to given step.
|
||||
step := roundAction.XnToStep
|
||||
// This is the current state.
|
||||
rs := cm.cs.RoundState()
|
||||
rs := conA.conS.RoundState()
|
||||
if height != rs.Height || round != rs.Round {
|
||||
return // Not relevant.
|
||||
}
|
||||
|
||||
if step == RoundStepProposal && rs.Step() == RoundStepStart {
|
||||
// Propose a block if I am the proposer.
|
||||
privValidator := cm.PrivValidator()
|
||||
privValidator := conA.PrivValidator()
|
||||
if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id {
|
||||
// If we're already locked on a proposal, use that.
|
||||
proposal := cm.cs.LockedProposal()
|
||||
proposal := conA.conS.LockedProposal()
|
||||
if proposal != nil {
|
||||
// Otherwise, construct a new proposal.
|
||||
var err error
|
||||
proposal, err = cm.constructProposal(rs)
|
||||
proposal, err = conA.constructProposal(rs)
|
||||
if err != nil {
|
||||
log.Error("Error attempting to construct a proposal: %v", err)
|
||||
return // Pretend like we weren't the proposer. Shrug.
|
||||
@@ -743,31 +743,31 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() {
|
||||
// Share the parts.
|
||||
// We send all parts to all of our peers, but everyone receives parts
|
||||
// starting at a different index, wrapping around back to 0.
|
||||
cm.shareProposal(rs)
|
||||
conA.shareProposal(rs)
|
||||
}
|
||||
} else if step == RoundStepBareVotes && rs.Step() <= RoundStepProposal {
|
||||
err := cm.voteProposal(rs)
|
||||
err := conA.voteProposal(rs)
|
||||
if err != nil {
|
||||
log.Info("Error attempting to vote for proposal: %v", err)
|
||||
}
|
||||
} else if step == RoundStepPrecommits && rs.Step() <= RoundStepBareVotes {
|
||||
err := cm.precommitProposal(rs)
|
||||
err := conA.precommitProposal(rs)
|
||||
if err != nil {
|
||||
log.Info("Error attempting to precommit for proposal: %v", err)
|
||||
}
|
||||
} else if step == RoundStepCommitOrUnlock && rs.Step() <= RoundStepPrecommits {
|
||||
commitTime, err := cm.commitOrUnlockProposal(rs)
|
||||
commitTime, err := conA.commitOrUnlockProposal(rs)
|
||||
if err != nil {
|
||||
log.Info("Error attempting to commit or update for proposal: %v", err)
|
||||
}
|
||||
|
||||
if !commitTime.IsZero() {
|
||||
// We already set up ConsensusState for the next height
|
||||
// (it happens in the call to cm.commitProposal).
|
||||
// (it happens in the call to conA.commitProposal).
|
||||
} else {
|
||||
// Round is over. This is a special case.
|
||||
// Prepare a new RoundState for the next state.
|
||||
cm.cs.SetupRound(rs.Round + 1)
|
||||
conA.conS.SetupRound(rs.Round + 1)
|
||||
return // setAlarm() takes care of the rest.
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -55,7 +55,7 @@ func (cs *ConsensusState) RoundState() *RoundState {
|
||||
return cs.roundState
|
||||
}
|
||||
|
||||
// Primarily gets called upon block commit by ConsensusManager.
|
||||
// Primarily gets called upon block commit by ConsensusAgent.
|
||||
func (cs *ConsensusState) Update(state *State) {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
|
||||
22
main.go
22
main.go
@@ -10,10 +10,10 @@ import (
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
lz []p2p.Listener
|
||||
sw *p2p.Switch
|
||||
book *p2p.AddrBook
|
||||
pmgr *p2p.PeerManager
|
||||
lz []p2p.Listener
|
||||
sw *p2p.Switch
|
||||
book *p2p.AddrBook
|
||||
pexAgent *p2p.PEXAgent
|
||||
}
|
||||
|
||||
func NewNode() *Node {
|
||||
@@ -53,12 +53,12 @@ func NewNode() *Node {
|
||||
}
|
||||
sw := p2p.NewSwitch(chDescs)
|
||||
book := p2p.NewAddrBook(config.RootDir + "/addrbook.json")
|
||||
pmgr := p2p.NewPeerManager(sw, book)
|
||||
pexAgent := p2p.NewPEXAgent(sw, book)
|
||||
|
||||
return &Node{
|
||||
sw: sw,
|
||||
book: book,
|
||||
pmgr: pmgr,
|
||||
sw: sw,
|
||||
book: book,
|
||||
pexAgent: pexAgent,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ func (n *Node) Start() {
|
||||
}
|
||||
n.sw.Start()
|
||||
n.book.Start()
|
||||
n.pmgr.Start()
|
||||
n.pexAgent.Start()
|
||||
}
|
||||
|
||||
func (n *Node) Stop() {
|
||||
@@ -77,7 +77,7 @@ func (n *Node) Stop() {
|
||||
// TODO: gracefully disconnect from peers.
|
||||
n.sw.Stop()
|
||||
n.book.Stop()
|
||||
n.pmgr.Stop()
|
||||
n.pexAgent.Stop()
|
||||
}
|
||||
|
||||
// Add a Listener to accept inbound peer connections.
|
||||
@@ -102,7 +102,7 @@ func (n *Node) inboundConnectionRoutine(l p2p.Listener) {
|
||||
}
|
||||
// NOTE: We don't yet have the external address of the
|
||||
// remote (if they have a listener at all).
|
||||
// PeerManager's pexRoutine will handle that.
|
||||
// PEXAgent's pexRoutine will handle that.
|
||||
}
|
||||
|
||||
// cleanup
|
||||
|
||||
127
mempool/agent.go
Normal file
127
mempool/agent.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package mempol
|
||||
|
||||
import (
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
type MempoolAgent struct {
|
||||
sw *p2p.Switch
|
||||
swEvents chan interface{}
|
||||
quit chan struct{}
|
||||
started uint32
|
||||
stopped uint32
|
||||
}
|
||||
|
||||
func NewMempoolAgent(sw *p2p.Switch) *MempoolAgent {
|
||||
swEvents := make(chan interface{})
|
||||
sw.AddEventListener("MempoolAgent.swEvents", swEvents)
|
||||
memA := &MempoolAgent{
|
||||
sw: sw,
|
||||
swEvents: swEvents,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
return memA
|
||||
}
|
||||
|
||||
func (memA *MempoolAgent) Start() {
|
||||
if atomic.CompareAndSwapUint32(&memA.started, 0, 1) {
|
||||
log.Info("Starting MempoolAgent")
|
||||
go memA.switchEventsRoutine()
|
||||
go memA.gossipTxRoutine()
|
||||
}
|
||||
}
|
||||
|
||||
func (memA *MempoolAgent) Stop() {
|
||||
if atomic.CompareAndSwapUint32(&memA.stopped, 0, 1) {
|
||||
log.Info("Stopping MempoolAgent")
|
||||
close(memA.quit)
|
||||
close(memA.swEvents)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle peer new/done events
|
||||
func (memA *MempoolAgent) switchEventsRoutine() {
|
||||
for {
|
||||
swEvent, ok := <-memA.swEvents
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
switch swEvent.(type) {
|
||||
case p2p.SwitchEventNewPeer:
|
||||
// event := swEvent.(p2p.SwitchEventNewPeer)
|
||||
case p2p.SwitchEventDonePeer:
|
||||
// event := swEvent.(p2p.SwitchEventDonePeer)
|
||||
default:
|
||||
log.Warning("Unhandled switch event type")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (memA *MempoolAgent) gossipTxRoutine() {
|
||||
OUTER_LOOP:
|
||||
for {
|
||||
// Receive incoming message on ProposalCh
|
||||
inMsg, ok := memA.sw.Receive(ProposalCh)
|
||||
if !ok {
|
||||
break OUTER_LOOP // Client has stopped
|
||||
}
|
||||
_, msg_ := decodeMessage(inMsg.Bytes)
|
||||
log.Info("gossipProposalRoutine received %v", msg_)
|
||||
|
||||
switch msg_.(type) {
|
||||
case *TxMessage:
|
||||
// msg := msg_.(*TxMessage)
|
||||
// XXX
|
||||
|
||||
default:
|
||||
// Ignore unknown message
|
||||
// memA.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
const (
|
||||
msgTypeUnknown = byte(0x00)
|
||||
msgTypeTx = byte(0x10)
|
||||
)
|
||||
|
||||
// TODO: check for unnecessary extra bytes at the end.
|
||||
func decodeMessage(bz []byte) (msgType byte, msg interface{}) {
|
||||
n, err := new(int64), new(error)
|
||||
// log.Debug("decoding msg bytes: %X", bz)
|
||||
msgType = bz[0]
|
||||
switch msgType {
|
||||
case msgTypeTx:
|
||||
msg = readTxMessage(bytes.NewReader(bz[1:]), n, err)
|
||||
default:
|
||||
msg = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
type TxMessage struct {
|
||||
Tx Tx
|
||||
}
|
||||
|
||||
func readTxMessage(r io.Reader, n *int64, err *error) *TxMessage {
|
||||
return &TxMessage{
|
||||
Tx: ReadTx(r, n, err),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *TxMessage) WriteTo(w io.Writer) (n int64, err error) {
|
||||
WriteByte(w, msgTypeTx, &n, &err)
|
||||
WriteBinary(w, m.Tx, &n, &err)
|
||||
return
|
||||
}
|
||||
|
||||
func (m *TxMessage) String() string {
|
||||
return fmt.Sprintf("[TxMessage %v]", m.Tx)
|
||||
}
|
||||
@@ -22,10 +22,10 @@ const (
|
||||
)
|
||||
|
||||
/*
|
||||
PeerManager handles PEX (peer exchange) and ensures that an
|
||||
PEXAgent handles PEX (peer exchange) and ensures that an
|
||||
adequate number of peers are connected to the switch.
|
||||
*/
|
||||
type PeerManager struct {
|
||||
type PEXAgent struct {
|
||||
sw *Switch
|
||||
swEvents chan interface{}
|
||||
quit chan struct{}
|
||||
@@ -35,49 +35,49 @@ type PeerManager struct {
|
||||
book *AddrBook
|
||||
}
|
||||
|
||||
func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager {
|
||||
func NewPEXAgent(sw *Switch, book *AddrBook) *PEXAgent {
|
||||
swEvents := make(chan interface{})
|
||||
sw.AddEventListener("PeerManager.swEvents", swEvents)
|
||||
pm := &PeerManager{
|
||||
sw.AddEventListener("PEXAgent.swEvents", swEvents)
|
||||
pexA := &PEXAgent{
|
||||
sw: sw,
|
||||
swEvents: swEvents,
|
||||
quit: make(chan struct{}),
|
||||
book: book,
|
||||
}
|
||||
return pm
|
||||
return pexA
|
||||
}
|
||||
|
||||
func (pm *PeerManager) Start() {
|
||||
if atomic.CompareAndSwapUint32(&pm.started, 0, 1) {
|
||||
log.Info("Starting PeerManager")
|
||||
go pm.switchEventsRoutine()
|
||||
go pm.requestRoutine()
|
||||
go pm.ensurePeersRoutine()
|
||||
func (pexA *PEXAgent) Start() {
|
||||
if atomic.CompareAndSwapUint32(&pexA.started, 0, 1) {
|
||||
log.Info("Starting PEXAgent")
|
||||
go pexA.switchEventsRoutine()
|
||||
go pexA.requestRoutine()
|
||||
go pexA.ensurePeersRoutine()
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PeerManager) Stop() {
|
||||
if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) {
|
||||
log.Info("Stopping PeerManager")
|
||||
close(pm.quit)
|
||||
close(pm.swEvents)
|
||||
func (pexA *PEXAgent) Stop() {
|
||||
if atomic.CompareAndSwapUint32(&pexA.stopped, 0, 1) {
|
||||
log.Info("Stopping PEXAgent")
|
||||
close(pexA.quit)
|
||||
close(pexA.swEvents)
|
||||
}
|
||||
}
|
||||
|
||||
// Asks peer for more addresses.
|
||||
func (pm *PeerManager) RequestPEX(peer *Peer) {
|
||||
func (pexA *PEXAgent) RequestPEX(peer *Peer) {
|
||||
peer.TrySend(PexCh, &pexRequestMessage{})
|
||||
}
|
||||
|
||||
func (pm *PeerManager) SendAddrs(peer *Peer, addrs []*NetAddress) {
|
||||
func (pexA *PEXAgent) SendAddrs(peer *Peer, addrs []*NetAddress) {
|
||||
peer.Send(PexCh, &pexAddrsMessage{Addrs: addrs})
|
||||
}
|
||||
|
||||
// For new outbound peers, announce our listener addresses if any,
|
||||
// and if .book needs more addresses, ask for them.
|
||||
func (pm *PeerManager) switchEventsRoutine() {
|
||||
func (pexA *PEXAgent) switchEventsRoutine() {
|
||||
for {
|
||||
swEvent, ok := <-pm.swEvents
|
||||
swEvent, ok := <-pexA.swEvents
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
@@ -85,9 +85,9 @@ func (pm *PeerManager) switchEventsRoutine() {
|
||||
case SwitchEventNewPeer:
|
||||
event := swEvent.(SwitchEventNewPeer)
|
||||
if event.Peer.IsOutbound() {
|
||||
pm.SendAddrs(event.Peer, pm.book.OurAddresses())
|
||||
if pm.book.NeedMoreAddrs() {
|
||||
pm.RequestPEX(event.Peer)
|
||||
pexA.SendAddrs(event.Peer, pexA.book.OurAddresses())
|
||||
if pexA.book.NeedMoreAddrs() {
|
||||
pexA.RequestPEX(event.Peer)
|
||||
}
|
||||
}
|
||||
case SwitchEventDonePeer:
|
||||
@@ -97,17 +97,17 @@ func (pm *PeerManager) switchEventsRoutine() {
|
||||
}
|
||||
|
||||
// Ensures that sufficient peers are connected. (continuous)
|
||||
func (pm *PeerManager) ensurePeersRoutine() {
|
||||
func (pexA *PEXAgent) ensurePeersRoutine() {
|
||||
// fire once immediately.
|
||||
pm.ensurePeers()
|
||||
pexA.ensurePeers()
|
||||
// fire periodically
|
||||
timer := NewRepeatTimer(ensurePeersPeriodSeconds * time.Second)
|
||||
FOR_LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-timer.Ch:
|
||||
pm.ensurePeers()
|
||||
case <-pm.quit:
|
||||
pexA.ensurePeers()
|
||||
case <-pexA.quit:
|
||||
break FOR_LOOP
|
||||
}
|
||||
}
|
||||
@@ -117,8 +117,8 @@ FOR_LOOP:
|
||||
}
|
||||
|
||||
// Ensures that sufficient peers are connected. (once)
|
||||
func (pm *PeerManager) ensurePeers() {
|
||||
numOutPeers, _, numDialing := pm.sw.NumPeers()
|
||||
func (pexA *PEXAgent) ensurePeers() {
|
||||
numOutPeers, _, numDialing := pexA.sw.NumPeers()
|
||||
numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
|
||||
if numToDial <= 0 {
|
||||
return
|
||||
@@ -133,13 +133,13 @@ func (pm *PeerManager) ensurePeers() {
|
||||
// Try to fetch a new peer 3 times.
|
||||
// This caps the maximum number of tries to 3 * numToDial.
|
||||
for j := 0; i < 3; j++ {
|
||||
picked = pm.book.PickAddress(newBias)
|
||||
picked = pexA.book.PickAddress(newBias)
|
||||
if picked == nil {
|
||||
return
|
||||
}
|
||||
if toDial.Has(picked.String()) ||
|
||||
pm.sw.IsDialing(picked) ||
|
||||
pm.sw.Peers().Has(picked.String()) {
|
||||
pexA.sw.IsDialing(picked) ||
|
||||
pexA.sw.Peers().Has(picked.String()) {
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
@@ -155,19 +155,19 @@ func (pm *PeerManager) ensurePeers() {
|
||||
for _, item := range toDial.Values() {
|
||||
picked := item.(*NetAddress)
|
||||
go func() {
|
||||
_, err := pm.sw.DialPeerWithAddress(picked)
|
||||
_, err := pexA.sw.DialPeerWithAddress(picked)
|
||||
if err != nil {
|
||||
pm.book.MarkAttempt(picked)
|
||||
pexA.book.MarkAttempt(picked)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Handles incoming PEX messages.
|
||||
func (pm *PeerManager) requestRoutine() {
|
||||
func (pexA *PEXAgent) requestRoutine() {
|
||||
|
||||
for {
|
||||
inMsg, ok := pm.sw.Receive(PexCh) // {Peer, Time, Packet}
|
||||
inMsg, ok := pexA.sw.Receive(PexCh) // {Peer, Time, Packet}
|
||||
if !ok {
|
||||
// Client has stopped
|
||||
break
|
||||
@@ -181,7 +181,7 @@ func (pm *PeerManager) requestRoutine() {
|
||||
case *pexRequestMessage:
|
||||
// inMsg.MConn.Peer requested some peers.
|
||||
// TODO: prevent abuse.
|
||||
addrs := pm.book.GetSelection()
|
||||
addrs := pexA.book.GetSelection()
|
||||
msg := &pexAddrsMessage{Addrs: addrs}
|
||||
queued := inMsg.MConn.Peer.TrySend(PexCh, msg)
|
||||
if !queued {
|
||||
@@ -193,11 +193,11 @@ func (pm *PeerManager) requestRoutine() {
|
||||
// (We don't want to get spammed with bad peers)
|
||||
srcAddr := inMsg.MConn.RemoteAddress
|
||||
for _, addr := range msg.(*pexAddrsMessage).Addrs {
|
||||
pm.book.AddAddress(addr, srcAddr)
|
||||
pexA.book.AddAddress(addr, srcAddr)
|
||||
}
|
||||
default:
|
||||
// Ignore unknown message.
|
||||
// pm.sw.StopPeerForError(inMsg.MConn.Peer, pexErrInvalidMessage)
|
||||
// pexA.sw.StopPeerForError(inMsg.MConn.Peer, pexErrInvalidMessage)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ func LoadState(db db_.Db) *State {
|
||||
}
|
||||
|
||||
// Save this state into the db.
|
||||
// For convenience, the commitTime (required by ConsensusManager)
|
||||
// For convenience, the commitTime (required by ConsensusAgent)
|
||||
// is saved here.
|
||||
func (s *State) Save(commitTime time.Time) {
|
||||
s.mtx.Lock()
|
||||
|
||||
Reference in New Issue
Block a user