diff --git a/cmd/main.go b/cmd/main.go index fe451fa48..e6b42ddf1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -29,16 +29,7 @@ Commands: case "daemon": config.ParseFlags(args[1:]) logger.Reset() - var deborable daemon.DeboraMode = daemon.DeboraNullMode - if len(args) > 1 { - switch args[1] { - case "debora": - deborable = daemon.DeboraPeerMode - case "dev": - deborable = daemon.DeboraDevMode - } - } - daemon.Daemon(deborable) + daemon.Daemon() case "gen_account": gen_account() case "gen_validator": diff --git a/config/config.go b/config/config.go index d7b8a0115..a7e4d5d49 100644 --- a/config/config.go +++ b/config/config.go @@ -35,22 +35,22 @@ func SetApp(a *confer.Config) { var defaultConfig = `# This is a TOML config file. # For more information, see https://github.com/toml-lang/toml -Network = "tendermint_testnet0" -ListenAddr = "0.0.0.0:8080" +Network = "tendermint_testnet0" +ListenAddr = "0.0.0.0:8080" # First node to connect to. Command-line overridable. -SeedNode = "188.166.55.222:8080" +SeedNode = "188.166.55.222:8080" [DB] # The only other available backend is "memdb" -Backend = "leveldb" -# Dir = "~/.tendermint/data" +Backend = "leveldb" +# Dir = "~/.tendermint/data" [Log.Stdout] -Level = "info" +Level = "info" [Log.File] -Level = "debug" -# Dir = "~/.tendermint/log" +Level = "debug" +# Dir = "~/.tendermint/log" [RPC.HTTP] # For the RPC API HTTP server. Port required. @@ -100,8 +100,6 @@ func initDefaults(rootDir string) { app.SetDefault("PrivValidatorfile", rootDir+"/priv_validator.json") app.SetDefault("FastSync", false) - - app.SetDefault("Debora.LogFile", rootDir+"/debora.log") } func Init(rootDir string) { diff --git a/consensus/state.go b/consensus/state.go index 8756df612..c036fa3b4 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -847,6 +847,7 @@ func (cs *ConsensusState) TryFinalizeCommit(height uint) bool { } hash, header, _ := cs.Commits.TwoThirdsMajority() if !cs.ProposalBlock.HashesTo(hash) { + // XXX See: https://github.com/tendermint/tendermint/issues/44 panic(Fmt("Expected ProposalBlock to hash to commit hash. Expected %X, got %X", hash, cs.ProposalBlock.Hash())) } if !cs.ProposalBlockParts.HasHeader(header) { diff --git a/daemon/daemon.go b/daemon/daemon.go index d3ecf1d26..419c3786f 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -4,7 +4,6 @@ import ( "os" "os/signal" - "github.com/ebuchman/debora" bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" "github.com/tendermint/tendermint/config" @@ -92,11 +91,10 @@ func NewNode() *Node { func (n *Node) Start() { log.Info("Starting Node") n.book.Start() - n.sw.Reactor("PEX").Start(n.sw) - n.sw.Reactor("MEMPOOL").Start(n.sw) - n.sw.Reactor("BLOCKCHAIN").Start(n.sw) - if !config.App().GetBool("FastSync") { - n.sw.Reactor("CONSENSUS").Start(n.sw) + n.sw.Start() + if config.App().GetBool("FastSync") { + // TODO: When FastSync is done, start CONSENSUS. + n.sw.Reactor("CONSENSUS").Stop() } } @@ -114,27 +112,6 @@ func (n *Node) AddListener(l p2p.Listener) { n.book.AddOurAddress(l.ExternalAddress()) } -func (n *Node) inboundConnectionRoutine(l p2p.Listener) { - for { - inConn, ok := <-l.Connections() - if !ok { - break - } - // New inbound connection! - peer, err := n.sw.AddPeerWithConnection(inConn, false) - if err != nil { - log.Info("Ignoring error from inbound connection: %v\n%v", - peer, err) - continue - } - // NOTE: We don't yet have the external address of the - // remote (if they have a listener at all). - // PEXReactor's pexRoutine will handle that. - } - - // cleanup -} - func (n *Node) DialSeed() { addr := p2p.NewNetAddressString(config.App().GetString("SeedNode")) peer, err := n.sw.DialPeerWithAddress(addr) @@ -170,50 +147,13 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { //------------------------------------------------------------------------------ -// debora variables -var ( - AppName = "tendermint" - SrcPath = "github.com/tendermint/tendermint/cmd" - PublicKey = "30820122300d06092a864886f70d01010105000382010f003082010a0282010100d1ffab251e05c0cae7bdd5f94c1b9644d4eb66847ee2e9a622b09e0228f2e70e6fecd1dfe6b3dc59fefab1abf0ff4e5d9657541cbe697ab1cf23fb26c9b857f6b6db8b67a0223b514ca77c8f1e049eaf9477d1a5f8041d045eeb0253a3c1ff7b90150d9b5c814a8d05fb707dd35aac118d5457334a557a82579f727a8bed521b0895b73da2458ffd1fc4be91adb624cc25731194d491f23ed47bf9a7265d28b23885e8a70625772eeeaf8e56ff3a1a2f33934668cc3a874042711f8b386da7842c117441a4d6ed29a182a00499ed5d4b6ce9532c5468d3976991f66d595a6f361e29cdf7750cf1c21e583e4c2207334c8d33ead731bf1172793b176089978b110203010001" - - DeboraCallPort = 56565 -) - -type DeboraMode int - -const ( - DeboraNullMode = iota // debora off by default - DeboraPeerMode // upgradeable - DeboraDevMode // upgrader -) - -func deboraBroadcast(n *Node) func([]byte) { - return func(payload []byte) { - msg := &p2p.PexDeboraMessage{Payload: payload} - n.sw.Broadcast(p2p.PexChannel, msg) - } -} - -func Daemon(deborable DeboraMode) { - // Add to debora - if deborable == DeboraPeerMode { - // TODO: support debora.logfile - if err := debora.Add(PublicKey, SrcPath, AppName, config.App().GetString("Debora.LogFile")); err != nil { - log.Info("Failed to add program to debora", "error", err) - } - } - +func Daemon() { // Create & start node n := NewNode() l := p2p.NewDefaultListener("tcp", config.App().GetString("ListenAddr"), false) n.AddListener(l) n.Start() - if deborable == DeboraDevMode { - log.Info("Running debora-dev server (listen to call)") - debora.DebListenAndServe("tendermint", DeboraCallPort, deboraBroadcast(n)) - } - // If seedNode is provided by config, dial out. if config.App().GetString("SeedNode") != "" { n.DialSeed() diff --git a/p2p/connection.go b/p2p/connection.go index c75538365..4de0a159f 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -385,7 +385,7 @@ FOR_LOOP: c.recvMonitor.Update(int(n)) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Warn("Connection failed @ recvRoutine", "connection", c, "error", err) + log.Warn("Connection failed @ recvRoutine (reading byte)", "connection", c, "error", err) c.stopForError(err) } break FOR_LOOP diff --git a/p2p/listener.go b/p2p/listener.go index f10e3199f..cc27c0c77 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -13,6 +13,7 @@ import ( type Listener interface { Connections() <-chan net.Conn ExternalAddress() *NetAddress + String() string Stop() } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 474676bda..6af070589 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "time" - "github.com/ebuchman/debora" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" ) @@ -116,13 +115,6 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { for _, addr := range msg.(*pexAddrsMessage).Addrs { pexR.book.AddAddress(addr, srcAddr) } - case *PexDeboraMessage: - srcAddr := src.Connection().RemoteAddress.String() - payload := msg.(*PexDeboraMessage).Payload - log.Info(fmt.Sprintf("Received debora msg with payload %s or %x", payload, payload)) - if err := debora.Call(srcAddr, payload); err != nil { - log.Info("Debora upgrade call failed.", "error", err) - } default: // Ignore unknown message. } @@ -223,7 +215,6 @@ const ( msgTypeRequest = byte(0x01) msgTypeAddrs = byte(0x02) msgTypeHandshake = byte(0x03) - msgTypeDebora = byte(0x04) ) // TODO: check for unnecessary extra bytes at the end. @@ -239,8 +230,6 @@ func DecodeMessage(bz []byte) (msg interface{}, err error) { msg = &pexRequestMessage{} case msgTypeAddrs: msg = binary.ReadBinary(&pexAddrsMessage{}, r, n, &err) - case msgTypeDebora: - msg = binary.ReadBinary(&PexDeboraMessage{}, r, n, &err) default: msg = nil } @@ -284,16 +273,3 @@ func (m *pexAddrsMessage) TypeByte() byte { return msgTypeAddrs } func (m *pexAddrsMessage) String() string { return fmt.Sprintf("[pexAddrs %v]", m.Addrs) } - -/* -A pexDeboraMessage requests the node to upgrade its source code -*/ -type PexDeboraMessage struct { - Payload []byte -} - -func (m *PexDeboraMessage) TypeByte() byte { return msgTypeDebora } - -func (m *PexDeboraMessage) String() string { - return "[pexDebora]" -} diff --git a/p2p/switch.go b/p2p/switch.go index 43b3b4ab0..cc60e4d3b 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net" + "sync/atomic" "time" . "github.com/tendermint/tendermint/common" @@ -34,6 +35,7 @@ type Switch struct { reactorsByCh map[byte]Reactor peers *PeerSet dialing *CMap + running uint32 } var ( @@ -53,6 +55,7 @@ func NewSwitch() *Switch { reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: NewCMap(), + running: 0, } return sw @@ -80,64 +83,66 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { return reactor } -func (sw *Switch) Reactor(name string) Reactor { - return sw.reactors[name] -} - -// Convenience function -func (sw *Switch) StartReactors() { - for _, reactor := range sw.reactors { - reactor.Start(sw) - } -} - -// Convenience function -func (sw *Switch) StopReactors() { - // Stop all reactors. - for _, reactor := range sw.reactors { - reactor.Stop() - } -} - -// Convenience function -func (sw *Switch) StopPeers() { - // Stop each peer. - for _, peer := range sw.peers.List() { - peer.stop() - } - sw.peers = NewPeerSet() -} - -// Convenience function -func (sw *Switch) StopListeners() { - // Stop each listener. - for _, listener := range sw.listeners { - listener.Stop() - } - sw.listeners = nil -} - -// Convenience function -func (sw *Switch) Stop() { - sw.StopPeers() - sw.StopReactors() - sw.StopListeners() -} - -// Not goroutine safe to modify. +// Not goroutine safe. func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } -func (sw *Switch) AddListener(l Listener) { - sw.listeners = append(sw.listeners, l) - go sw.listenerRoutine(l) +// Not goroutine safe. +func (sw *Switch) Reactor(name string) Reactor { + return sw.reactors[name] } +// Not goroutine safe. +func (sw *Switch) AddListener(l Listener) { + sw.listeners = append(sw.listeners, l) +} + +func (sw *Switch) Listeners() []Listener { + return sw.listeners +} + +// Not goroutine safe. func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } +func (sw *Switch) Start() { + if atomic.CompareAndSwapUint32(&sw.running, 0, 1) { + // Start reactors + for _, reactor := range sw.reactors { + reactor.Start(sw) + } + // Start peers + for _, peer := range sw.peers.List() { + sw.startInitPeer(peer) + } + // Start listeners + for _, listener := range sw.listeners { + go sw.listenerRoutine(listener) + } + } +} + +func (sw *Switch) Stop() { + if atomic.CompareAndSwapUint32(&sw.running, 1, 0) { + // Stop listeners + for _, listener := range sw.listeners { + listener.Stop() + } + sw.listeners = nil + // Stop peers + for _, peer := range sw.peers.List() { + peer.stop() + } + sw.peers = NewPeerSet() + // Stop reactors + for _, reactor := range sw.reactors { + reactor.Stop() + } + } +} + func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) @@ -149,6 +154,13 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er return nil, ErrSwitchDuplicatePeer } + if atomic.LoadUint32(&sw.running) == 1 { + sw.startInitPeer(peer) + } + return peer, nil +} + +func (sw *Switch) startInitPeer(peer *Peer) { // Start the peer peer.start() @@ -158,8 +170,6 @@ func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, er // Send handshake msg := &pexHandshakeMessage{Network: sw.network} peer.Send(PexChannel, msg) - - return peer, nil } func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { diff --git a/p2p/switch_test.go b/p2p/switch_test.go index ffdb9950c..d66c039a3 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -74,6 +74,8 @@ func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *S // Create two switches that will be interconnected. s1 := initSwitch(NewSwitch()) s2 := initSwitch(NewSwitch()) + s1.Start() + s2.Start() // Create a listener for s1 l := NewDefaultListener("tcp", ":8001", true) diff --git a/rpc/core/net.go b/rpc/core/net.go index 64db01cfa..84ec575a0 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -31,9 +31,20 @@ func Status() (*ResponseStatus, error) { //----------------------------------------------------------------------------- func NetInfo() (*ResponseNetInfo, error) { - o, i, _ := p2pSwitch.NumPeers() - numPeers := o + i listening := p2pSwitch.IsListening() network := config.App().GetString("Network") - return &ResponseNetInfo{numPeers, listening, network}, nil + listeners := []string{} + for _, listener := range p2pSwitch.Listeners() { + listeners = append(listeners, listener.String()) + } + peers := []string{} + for _, peer := range p2pSwitch.Peers().List() { + peers = append(peers, peer.String()) + } + return &ResponseNetInfo{ + Network: network, + Listening: listening, + Listeners: listeners, + Peers: peers, + }, nil } diff --git a/rpc/core/responses.go b/rpc/core/responses.go index 41cb22847..a9f128fb0 100644 --- a/rpc/core/responses.go +++ b/rpc/core/responses.go @@ -64,9 +64,10 @@ type ResponseStatus struct { } type ResponseNetInfo struct { - NumPeers int - Listening bool Network string + Listening bool + Listeners []string + Peers []string } type ResponseSignTx struct { diff --git a/rpc/test/.tendermint/priv_validator.json b/rpc/test/.tendermint/priv_validator.json deleted file mode 100755 index 5429f5009..000000000 --- a/rpc/test/.tendermint/priv_validator.json +++ /dev/null @@ -1 +0,0 @@ -{"Address":"D7DFF9806078899C8DA3FE3633CC0BF3C6C2B1BB","PubKey":[1,"2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3"],"PrivKey":[1,"FDE3BD94CB327D19464027BA668194C5EFA46AE83E8419D7542CFF41F00C81972239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3"],"LastHeight":2,"LastRound":0,"LastStep":2} \ No newline at end of file diff --git a/rpc/test/test.go b/rpc/test/test.go index 09bcd8fdb..0b31525da 100644 --- a/rpc/test/test.go +++ b/rpc/test/test.go @@ -29,10 +29,17 @@ var ( userAddr = "D7DFF9806078899C8DA3FE3633CC0BF3C6C2B1BB" userPriv = "FDE3BD94CB327D19464027BA668194C5EFA46AE83E8419D7542CFF41F00C81972239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" - - userPub = "2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" + userPub = "2239C21C81EA7173A6C489145490C015E05D4B97448933B708A7EC5B7B4921E3" ) +func decodeHex(hexStr string) []byte { + bytes, err := hex.DecodeString(hexStr) + if err != nil { + panic(err) + } + return bytes +} + func newNode(ready chan struct{}) { // Create & start node node = daemon.NewNode() @@ -62,10 +69,13 @@ func init() { config.SetApp(app) logger.Reset() - priv := state.LoadPrivValidator(rootDir + "/priv_validator.json") - priv.LastHeight = 0 - priv.LastRound = 0 - priv.LastStep = 0 + // Save new priv_validator file. + priv := &state.PrivValidator{ + Address: decodeHex(userAddr), + PubKey: account.PubKeyEd25519(decodeHex(userPub)), + PrivKey: account.PrivKeyEd25519(decodeHex(userPriv)), + } + priv.SetFile(rootDir + "/priv_validator.json") priv.Save() // start a node diff --git a/state/priv_validator.go b/state/priv_validator.go index 733d1151c..17839a500 100644 --- a/state/priv_validator.go +++ b/state/priv_validator.go @@ -85,6 +85,12 @@ func LoadPrivValidator(filename string) *PrivValidator { return privVal } +func (privVal *PrivValidator) SetFile(filename string) { + privVal.mtx.Lock() + defer privVal.mtx.Unlock() + privVal.filename = filename +} + func (privVal *PrivValidator) Save() { privVal.mtx.Lock() defer privVal.mtx.Unlock()