From 6750d05b05de2fa9fae776d0b4118cd903d266c9 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 18 Jul 2014 21:21:42 -0700 Subject: [PATCH] Message is wrapped by TypedMessage. --- README.md | 5 +- binary/binary.go | 2 +- blocks/account.go | 6 +- blocks/adjustment.go | 30 ++++---- blocks/block.go | 52 ++++++------- blocks/block_manager.go | 163 ++++++++++++++++++++++++++++++++++++++++ blocks/codec_test.go | 12 +-- blocks/signature.go | 4 +- blocks/tx.go | 20 ++--- blocks/vote.go | 6 +- main.go | 8 +- p2p/msg.go | 85 +++++++++++++++++++++ p2p/netaddress.go | 4 +- p2p/peer.go | 56 -------------- p2p/peer_manager.go | 83 ++++++++++---------- 15 files changed, 365 insertions(+), 171 deletions(-) create mode 100644 blocks/block_manager.go diff --git a/README.md b/README.md index 747dbeb3d..9f71a04fd 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,13 @@ TenderMint - proof of concept -* **[peer](https://github.com/tendermint/tendermint/blob/master/peer):** P2P networking stack. Designed to be extensible. +* **[p2p](https://github.com/tendermint/tendermint/blob/master/p2p):** P2P networking stack. Designed to be extensible. * **[merkle](https://github.com/tendermint/tendermint/blob/master/merkle):** Immutable Persistent Merkle-ized AVL+ Tree, used primarily for keeping track of mutable state like account balances. * **[crypto](https://github.com/tendermint/tendermint/blob/master/crypto):** Includes cgo bindings of ed25519. ### Status -* Node & testnet *now* +* Block manager *now* +* Node & testnet *complete* * PEX peer exchange *complete* * p2p/* *complete* * Ed25519 bindings *complete* diff --git a/binary/binary.go b/binary/binary.go index 453079a45..3e4f107a4 100644 --- a/binary/binary.go +++ b/binary/binary.go @@ -6,7 +6,7 @@ type Binary interface { WriteTo(w io.Writer) (int64, error) } -func WriteOnto(b Binary, w io.Writer, n int64, err error) (int64, error) { +func WriteTo(b Binary, w io.Writer, n int64, err error) (int64, error) { if err != nil { return n, err } diff --git a/blocks/account.go b/blocks/account.go index b4b1915cc..110e414a4 100644 --- a/blocks/account.go +++ b/blocks/account.go @@ -33,12 +33,12 @@ func ReadAccountId(r io.Reader) AccountId { } func (self AccountId) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Type, w, n, err) + n, err = WriteTo(self.Type, w, n, err) if self.Type == ACCOUNT_TYPE_NUMBER || self.Type == ACCOUNT_TYPE_BOTH { - n, err = WriteOnto(self.Number, w, n, err) + n, err = WriteTo(self.Number, w, n, err) } if self.Type == ACCOUNT_TYPE_PUBKEY || self.Type == ACCOUNT_TYPE_BOTH { - n, err = WriteOnto(self.PubKey, w, n, err) + n, err = WriteTo(self.PubKey, w, n, err) } return } diff --git a/blocks/adjustment.go b/blocks/adjustment.go index 175bda141..84a429b42 100644 --- a/blocks/adjustment.go +++ b/blocks/adjustment.go @@ -73,11 +73,11 @@ func (self *Bond) Type() Byte { } func (self *Bond) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Type(), w, n, err) - n, err = WriteOnto(self.Fee, w, n, err) - n, err = WriteOnto(self.UnbondTo, w, n, err) - n, err = WriteOnto(self.Amount, w, n, err) - n, err = WriteOnto(self.Signature, w, n, err) + n, err = WriteTo(self.Type(), w, n, err) + n, err = WriteTo(self.Fee, w, n, err) + n, err = WriteTo(self.UnbondTo, w, n, err) + n, err = WriteTo(self.Amount, w, n, err) + n, err = WriteTo(self.Signature, w, n, err) return } @@ -94,10 +94,10 @@ func (self *Unbond) Type() Byte { } func (self *Unbond) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Type(), w, n, err) - n, err = WriteOnto(self.Fee, w, n, err) - n, err = WriteOnto(self.Amount, w, n, err) - n, err = WriteOnto(self.Signature, w, n, err) + n, err = WriteTo(self.Type(), w, n, err) + n, err = WriteTo(self.Fee, w, n, err) + n, err = WriteTo(self.Amount, w, n, err) + n, err = WriteTo(self.Signature, w, n, err) return } @@ -113,9 +113,9 @@ func (self *Timeout) Type() Byte { } func (self *Timeout) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Type(), w, n, err) - n, err = WriteOnto(self.Account, w, n, err) - n, err = WriteOnto(self.Penalty, w, n, err) + n, err = WriteTo(self.Type(), w, n, err) + n, err = WriteTo(self.Account, w, n, err) + n, err = WriteTo(self.Penalty, w, n, err) return } @@ -131,8 +131,8 @@ func (self *Dupeout) Type() Byte { } func (self *Dupeout) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Type(), w, n, err) - n, err = WriteOnto(self.VoteA, w, n, err) - n, err = WriteOnto(self.VoteB, w, n, err) + n, err = WriteTo(self.Type(), w, n, err) + n, err = WriteTo(self.VoteA, w, n, err) + n, err = WriteTo(self.VoteB, w, n, err) return } diff --git a/blocks/block.go b/blocks/block.go index 3fcdd8dd4..70a247b5a 100644 --- a/blocks/block.go +++ b/blocks/block.go @@ -11,7 +11,7 @@ import ( type Block struct { Header Validation - Data + Txs // Checkpoint } @@ -19,7 +19,7 @@ func ReadBlock(r io.Reader) *Block { return &Block{ Header: ReadHeader(r), Validation: ReadValidation(r), - Data: ReadData(r), + Txs: ReadTxs(r), } } @@ -28,9 +28,9 @@ func (self *Block) Validate() bool { } func (self *Block) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(&self.Header, w, n, err) - n, err = WriteOnto(&self.Validation, w, n, err) - n, err = WriteOnto(&self.Data, w, n, err) + n, err = WriteTo(&self.Header, w, n, err) + n, err = WriteTo(&self.Validation, w, n, err) + n, err = WriteTo(&self.Txs, w, n, err) return } @@ -43,7 +43,7 @@ type Header struct { Time UInt64 PrevHash ByteSlice ValidationHash ByteSlice - DataHash ByteSlice + TxsHash ByteSlice } func ReadHeader(r io.Reader) Header { @@ -54,18 +54,18 @@ func ReadHeader(r io.Reader) Header { Time: ReadUInt64(r), PrevHash: ReadByteSlice(r), ValidationHash: ReadByteSlice(r), - DataHash: ReadByteSlice(r), + TxsHash: ReadByteSlice(r), } } func (self *Header) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Name, w, n, err) - n, err = WriteOnto(self.Height, w, n, err) - n, err = WriteOnto(self.Fees, w, n, err) - n, err = WriteOnto(self.Time, w, n, err) - n, err = WriteOnto(self.PrevHash, w, n, err) - n, err = WriteOnto(self.ValidationHash, w, n, err) - n, err = WriteOnto(self.DataHash, w, n, err) + n, err = WriteTo(self.Name, w, n, err) + n, err = WriteTo(self.Height, w, n, err) + n, err = WriteTo(self.Fees, w, n, err) + n, err = WriteTo(self.Time, w, n, err) + n, err = WriteTo(self.PrevHash, w, n, err) + n, err = WriteTo(self.ValidationHash, w, n, err) + n, err = WriteTo(self.TxsHash, w, n, err) return } @@ -94,41 +94,41 @@ func ReadValidation(r io.Reader) Validation { } func (self *Validation) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(UInt64(len(self.Signatures)), w, n, err) - n, err = WriteOnto(UInt64(len(self.Adjustments)), w, n, err) + n, err = WriteTo(UInt64(len(self.Signatures)), w, n, err) + n, err = WriteTo(UInt64(len(self.Adjustments)), w, n, err) for _, sig := range self.Signatures { - n, err = WriteOnto(sig, w, n, err) + n, err = WriteTo(sig, w, n, err) } for _, adj := range self.Adjustments { - n, err = WriteOnto(adj, w, n, err) + n, err = WriteTo(adj, w, n, err) } return } -/* Block > Data */ +/* Block > Txs */ -type Data struct { +type Txs struct { Txs []Tx } -func ReadData(r io.Reader) Data { +func ReadTxs(r io.Reader) Txs { numTxs := int(ReadUInt64(r)) txs := make([]Tx, 0, numTxs) for i := 0; i < numTxs; i++ { txs = append(txs, ReadTx(r)) } - return Data{txs} + return Txs{txs} } -func (self *Data) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(UInt64(len(self.Txs)), w, n, err) +func (self *Txs) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(UInt64(len(self.Txs)), w, n, err) for _, tx := range self.Txs { - n, err = WriteOnto(tx, w, n, err) + n, err = WriteTo(tx, w, n, err) } return } -func (self *Data) MerkleHash() ByteSlice { +func (self *Txs) MerkleHash() ByteSlice { bs := make([]Binary, 0, len(self.Txs)) for i, tx := range self.Txs { bs[i] = Binary(tx) diff --git a/blocks/block_manager.go b/blocks/block_manager.go new file mode 100644 index 000000000..5c0825b3a --- /dev/null +++ b/blocks/block_manager.go @@ -0,0 +1,163 @@ +package blocks + +import ( + "github.com/tendermint/tendermint/p2p" +) + +const ( + BlocksCh = "block" + + msgTypeUnknown = Byte(0x00) + msgTypeState = Byte(0x01) + msgTypeRequest = Byte(0x02) + msgTypeData = Byte(0x03) + + dataTypeAll = byte(0x00) + dataTypeValidation = byte(0x01) + dataTypeTxs = byte(0x02) + // dataTypeCheckpoint = byte(0x04) +) + +/* + */ +type BlockManager struct { + quit chan struct{} + started uint32 + stopped uint32 +} + +func NewBlockManager() *BlockManager { + bm := &BlockManager{ + sw: sw, + quit: make(chan struct{}), + } + return bm +} + +func (bm *BlockManager) Start() { + if atomic.CompareAndSwapUint32(&bm.started, 0, 1) { + log.Info("Starting BlockManager") + go bm.XXX() + } +} + +func (bm *BlockManager) Stop() { + if atomic.CompareAndSwapUint32(&bm.stopped, 0, 1) { + log.Info("Stopping BlockManager") + close(bm.quit) + } +} + +func (bm *BlockManager) XXX() { +} + +//----------------------------------------------------------------------------- + +/* Messages */ + +// TODO: check for unnecessary extra bytes at the end. +func decodeMessage(bz ByteSlice) (msg Message) { + // log.Debug("decoding msg bytes: %X", bz) + switch Byte(bz[0]) { + case msgTypeState: + return &StateMessage{} + case msgTypeRequest: + return readRequestMessage(bytes.NewReader(bz[1:])) + case msgTypeData: + return readDataMessage(bytes.NewReader(bz[1:])) + default: + return nil + } +} + +/* +A StateMessage declares what (contiguous) blocks & headers are known. + +LastValidationHeight >= LastBlockHeight. +*/ +type StateMessage struct { + LastBlockHeight UInt64 + LastValidationHeight UInt64 +} + +func readStateMessage(r io.Reader) *StateMessage { + lastBlockHeight := ReadUInt64(r) + lastValidationHeight := ReadUInt64(r) + return &StateMessage{ + LastBlockHeight: lastBlockHeight, + LastValidationHeight: lastValidationHeight, + } +} + +func (m *StateMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(msgTypeState, w, n, err) + n, err = WriteTo(m.LastBlockHeight, w, n, err) + n, err = WriteTo(m.LastValidationHeight, w, n, err) + return +} + +func (m *StateMessage) String() string { + return fmt.Sprintf("[State %v/%v]", + m.LastBlockHeight, m.LastValidationHeight) +} + +/* +A RequestMessage requests a block and/or header at a given height. +*/ +type RequestMessage struct { + Type Byte + Height UInt64 +} + +func readRequestMessage(r io.Reader) *RequestMessage { + requestType := ReadByte(r) + height := ReadUInt64(r) + return &RequestMessage{ + Type: requestType, + Height: height, + } +} + +func (m *RequestMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(msgTypeRequest, w, n, err) + n, err = WriteTo(m.Type, w, n, err) + n, err = WriteTo(m.Height, w, n, err) + return +} + +func (m *RequestMessage) String() string { + return fmt.Sprintf("[Request %X@%v]", m.Type, m.Height) +} + +/* +A DataMessage contains block data, maybe requested. +The data can be a Validation, Txs, or whole Block object. +*/ +type DataMessage struct { + Type Byte + Height UInt64 + Bytes ByteSlice +} + +func readDataMessage(r io.Reader) *DataMessage { + dataType := ReadByte(r) + height := ReadUInt64(r) + bytes := ReadByteSlice(r) + return &DataMessage{ + Type: dataType, + Height: height, + Bytes: bytes, + } +} + +func (m *DataMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(msgTypeData, w, n, err) + n, err = WriteTo(m.Type, w, n, err) + n, err = WriteTo(m.Height, w, n, err) + n, err = WriteTo(m.Bytes, w, n, err) + return +} + +func (m *DataMessage) String() string { + return fmt.Sprintf("[Data %X@%v]", m.Type, m.Height) +} diff --git a/blocks/codec_test.go b/blocks/codec_test.go index 41ed0434c..0b98a8a1a 100644 --- a/blocks/codec_test.go +++ b/blocks/codec_test.go @@ -21,7 +21,7 @@ func BenchmarkTestCustom(b *testing.B) { Time: 123, PrevHash: ByteSlice("prevhash"), ValidationHash: ByteSlice("validationhash"), - DataHash: ByteSlice("datahash"), + TxsHash: ByteSlice("txshash"), } buf := bytes.NewBuffer(nil) @@ -44,7 +44,7 @@ type HHeader struct { Time uint64 `json:"T"` PrevHash []byte `json:"PH"` ValidationHash []byte `json:"VH"` - DataHash []byte `json:"DH"` + TxsHash []byte `json:"DH"` } func BenchmarkTestJSON(b *testing.B) { @@ -57,7 +57,7 @@ func BenchmarkTestJSON(b *testing.B) { Time: 123, PrevHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + TxsHash: []byte("txshash"), } h2 := &HHeader{} @@ -86,7 +86,7 @@ func BenchmarkTestGob(b *testing.B) { Time: 123, PrevHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + TxsHash: []byte("txshash"), } h2 := &Header{} @@ -115,7 +115,7 @@ func BenchmarkTestMsgPack(b *testing.B) { Time: 123, PrevHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + TxsHash: []byte("txshash"), } h2 := &Header{} @@ -144,7 +144,7 @@ func BenchmarkTestMsgPack2(b *testing.B) { Time: 123, PrevHash: []byte("prevhash"), ValidationHash: []byte("validationhash"), - DataHash: []byte("datahash"), + TxsHash: []byte("txshash"), } h2 := &Header{} var mh codec.MsgpackHandle diff --git a/blocks/signature.go b/blocks/signature.go index f72b4f921..cdd1f361d 100644 --- a/blocks/signature.go +++ b/blocks/signature.go @@ -31,8 +31,8 @@ func ReadSignature(r io.Reader) Signature { } func (self Signature) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Signer, w, n, err) - n, err = WriteOnto(self.SigBytes, w, n, err) + n, err = WriteTo(self.Signer, w, n, err) + n, err = WriteTo(self.SigBytes, w, n, err) return } diff --git a/blocks/tx.go b/blocks/tx.go index d9f08f4cc..441a10081 100644 --- a/blocks/tx.go +++ b/blocks/tx.go @@ -66,11 +66,11 @@ func (self *SendTx) Type() Byte { } func (self *SendTx) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Type(), w, n, err) - n, err = WriteOnto(self.Fee, w, n, err) - n, err = WriteOnto(self.To, w, n, err) - n, err = WriteOnto(self.Amount, w, n, err) - n, err = WriteOnto(self.Signature, w, n, err) + n, err = WriteTo(self.Type(), w, n, err) + n, err = WriteTo(self.Fee, w, n, err) + n, err = WriteTo(self.To, w, n, err) + n, err = WriteTo(self.Amount, w, n, err) + n, err = WriteTo(self.Signature, w, n, err) return } @@ -88,10 +88,10 @@ func (self *NameTx) Type() Byte { } func (self *NameTx) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Type(), w, n, err) - n, err = WriteOnto(self.Fee, w, n, err) - n, err = WriteOnto(self.Name, w, n, err) - n, err = WriteOnto(self.PubKey, w, n, err) - n, err = WriteOnto(self.Signature, w, n, err) + n, err = WriteTo(self.Type(), w, n, err) + n, err = WriteTo(self.Fee, w, n, err) + n, err = WriteTo(self.Name, w, n, err) + n, err = WriteTo(self.PubKey, w, n, err) + n, err = WriteTo(self.Signature, w, n, err) return } diff --git a/blocks/vote.go b/blocks/vote.go index 3e64bdd66..c1500f05e 100644 --- a/blocks/vote.go +++ b/blocks/vote.go @@ -25,8 +25,8 @@ func ReadVote(r io.Reader) Vote { } func (self Vote) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(self.Height, w, n, err) - n, err = WriteOnto(self.BlockHash, w, n, err) - n, err = WriteOnto(self.Signature, w, n, err) + n, err = WriteTo(self.Height, w, n, err) + n, err = WriteTo(self.BlockHash, w, n, err) + n, err = WriteTo(self.Signature, w, n, err) return } diff --git a/main.go b/main.go index 4f3b96f0a..fa7a3ba2c 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,5 @@ package main -// TODO: ensure Mark* gets called. - import ( "os" "os/signal" @@ -116,8 +114,7 @@ func (n *Node) switchEventsHandler() { if event.Peer.IsOutbound() { n.sendOurExternalAddrs(event.Peer) if n.book.NeedMoreAddrs() { - pkt := p2p.NewPacket(p2p.PexCh, p2p.NewPexRequestMessage()) - event.Peer.TrySend(pkt) + n.pmgr.RequestPEX(event.Peer) } } case p2p.SwitchEventDonePeer: @@ -132,8 +129,7 @@ func (n *Node) sendOurExternalAddrs(peer *p2p.Peer) { for _, l := range n.lz { addrs = append(addrs, l.ExternalAddress()) } - msg := &p2p.PexAddrsMessage{Addrs: addrs} - peer.Send(p2p.NewPacket(p2p.PexCh, msg)) + n.pmgr.SendAddrs(peer, addrs) // On the remote end, the pexHandler may choose // to add these to its book. } diff --git a/p2p/msg.go b/p2p/msg.go index 6901a801c..57eeb9b63 100644 --- a/p2p/msg.go +++ b/p2p/msg.go @@ -1,9 +1,94 @@ package p2p import ( + "bytes" + "fmt" + "io" + . "github.com/tendermint/tendermint/binary" ) +/* +A Message is anything that can be serialized. +The resulting serialized bytes of Message don't contain type information, +so messages are typically wrapped in a TypedMessage before put in the wire. +*/ type Message interface { Binary } + +/* +A TypedMessage extends a Message with a single byte of type information. +When deserializing a message from the wire, a switch statement is needed +to dispatch to the correct constructor, typically named "ReadXXXMessage". +*/ +type TypedMessage struct { + Type Byte + Message Message +} + +func (tm TypedMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(tm.Type, w, n, err) + n, err = WriteTo(tm.Message, w, n, err) + return +} + +func (tm TypedMessage) String() string { + return fmt.Sprintf("0x%X⋺%v", tm.Type, tm.Message) +} + +//----------------------------------------------------------------------------- + +/* +Packet encapsulates a ByteSlice on a Channel. +Typically the Bytes are the serialized form of a TypedMessage. +*/ +type Packet struct { + Channel String + Bytes ByteSlice + // Hash +} + +func NewPacket(chName String, msg Binary) Packet { + msgBytes := BinaryBytes(msg) + return Packet{ + Channel: chName, + Bytes: msgBytes, + } +} + +func (p Packet) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(p.Channel, w, n, err) + n, err = WriteTo(p.Bytes, w, n, err) + return +} + +func (p Packet) Reader() io.Reader { + return bytes.NewReader(p.Bytes) +} + +func (p Packet) String() string { + return fmt.Sprintf("%v:%X", p.Channel, p.Bytes) +} + +func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { + chName, err := ReadStringSafe(r) + if err != nil { + return + } + // TODO: packet length sanity check. + bytes, err := ReadByteSliceSafe(r) + if err != nil { + return + } + return Packet{Channel: chName, Bytes: bytes}, nil +} + +/* +InboundPacket extends Packet with fields relevant to inbound packets. +*/ +type InboundPacket struct { + Peer *Peer + Time Time + Packet +} diff --git a/p2p/netaddress.go b/p2p/netaddress.go index 02570ccde..09977d36b 100644 --- a/p2p/netaddress.go +++ b/p2p/netaddress.go @@ -62,8 +62,8 @@ func ReadNetAddress(r io.Reader) *NetAddress { } func (na *NetAddress) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(ByteSlice(na.IP.To16()), w, n, err) - n, err = WriteOnto(na.Port, w, n, err) + n, err = WriteTo(ByteSlice(na.IP.To16()), w, n, err) + n, err = WriteTo(na.Port, w, n, err) return } diff --git a/p2p/peer.go b/p2p/peer.go index f3149ca43..0ff2ecbdb 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,7 +1,6 @@ package p2p import ( - "bytes" "fmt" "io" "sync/atomic" @@ -212,58 +211,3 @@ func (c *Channel) RecvQueue() <-chan Packet { func (c *Channel) SendQueue() chan<- Packet { return c.sendQueue } - -//----------------------------------------------------------------------------- - -/* -Packet encapsulates a ByteSlice on a Channel. -*/ -type Packet struct { - Channel String - Bytes ByteSlice - // Hash -} - -func NewPacket(chName String, msg Binary) Packet { - msgBytes := BinaryBytes(msg) - return Packet{ - Channel: chName, - Bytes: msgBytes, - } -} - -func (p Packet) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(p.Channel, w, n, err) - n, err = WriteOnto(p.Bytes, w, n, err) - return -} - -func (p Packet) Reader() io.Reader { - return bytes.NewReader(p.Bytes) -} - -func (p Packet) String() string { - return fmt.Sprintf("%v:%X", p.Channel, p.Bytes) -} - -func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { - chName, err := ReadStringSafe(r) - if err != nil { - return - } - // TODO: packet length sanity check. - bytes, err := ReadByteSliceSafe(r) - if err != nil { - return - } - return Packet{Channel: chName, Bytes: bytes}, nil -} - -/* -InboundPacket extends Packet with fields relevant to inbound packets. -*/ -type InboundPacket struct { - Peer *Peer - Time Time - Packet -} diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index 4b48141f7..b8bcbc758 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -15,7 +15,7 @@ import ( var pexErrInvalidMessage = errors.New("Invalid PEX message") const ( - PexCh = "PEX" + pexCh = "PEX" ensurePeersPeriodSeconds = 30 minNumOutboundPeers = 10 maxNumPeers = 50 @@ -44,7 +44,7 @@ func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager { func (pm *PeerManager) Start() { if atomic.CompareAndSwapUint32(&pm.started, 0, 1) { - log.Info("Starting peerManager") + log.Info("Starting PeerManager") go pm.ensurePeersHandler() go pm.pexHandler() } @@ -52,11 +52,24 @@ func (pm *PeerManager) Start() { func (pm *PeerManager) Stop() { if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) { - log.Info("Stopping peerManager") + log.Info("Stopping PeerManager") close(pm.quit) } } +// Asks peer for more addresses. +func (pm *PeerManager) RequestPEX(peer *Peer) { + msg := &pexRequestMessage{} + tm := TypedMessage{msgTypeRequest, msg} + peer.TrySend(NewPacket(pexCh, tm)) +} + +func (pm *PeerManager) SendAddrs(peer *Peer, addrs []*NetAddress) { + msg := &pexAddrsMessage{Addrs: addrs} + tm := TypedMessage{msgTypeAddrs, msg} + peer.Send(NewPacket(pexCh, tm)) +} + // Ensures that sufficient peers are connected. (continuous) func (pm *PeerManager) ensurePeersHandler() { // fire once immediately. @@ -124,11 +137,11 @@ func (pm *PeerManager) ensurePeers() { } } -// Handles incoming Pex messages. +// Handles incoming PEX messages. func (pm *PeerManager) pexHandler() { for { - inPkt := pm.sw.Receive(PexCh) // {Peer, Time, Packet} + inPkt := pm.sw.Receive(pexCh) // {Peer, Time, Packet} if inPkt == nil { // Client has stopped break @@ -139,22 +152,22 @@ func (pm *PeerManager) pexHandler() { log.Info("pexHandler received %v", msg) switch msg.(type) { - case *PexRequestMessage: + case *pexRequestMessage: // inPkt.Peer requested some peers. // TODO: prevent abuse. addrs := pm.book.GetSelection() - response := &PexAddrsMessage{Addrs: addrs} - pkt := NewPacket(PexCh, response) - queued := inPkt.Peer.TrySend(pkt) + msg := &pexAddrsMessage{Addrs: addrs} + tm := TypedMessage{msgTypeRequest, msg} + queued := inPkt.Peer.TrySend(NewPacket(pexCh, tm)) if !queued { // ignore } - case *PexAddrsMessage: + case *pexAddrsMessage: // We received some peer addresses from inPkt.Peer. // TODO: prevent abuse. // (We don't want to get spammed with bad peers) srcAddr := inPkt.Peer.RemoteAddress() - for _, addr := range msg.(*PexAddrsMessage).Addrs { + for _, addr := range msg.(*pexAddrsMessage).Addrs { pm.book.AddAddress(addr, srcAddr) } default: @@ -172,18 +185,18 @@ func (pm *PeerManager) pexHandler() { /* Messages */ const ( - pexTypeUnknown = Byte(0x00) - pexTypeRequest = Byte(0x01) - pexTypeAddrs = Byte(0x02) + msgTypeUnknown = Byte(0x00) + msgTypeRequest = Byte(0x01) + msgTypeAddrs = Byte(0x02) ) // TODO: check for unnecessary extra bytes at the end. func decodeMessage(bz ByteSlice) (msg Message) { // log.Debug("decoding msg bytes: %X", bz) switch Byte(bz[0]) { - case pexTypeRequest: - return &PexRequestMessage{} - case pexTypeAddrs: + case msgTypeRequest: + return &pexRequestMessage{} + case msgTypeAddrs: return readPexAddrsMessage(bytes.NewReader(bz[1:])) default: return nil @@ -191,54 +204,46 @@ func decodeMessage(bz ByteSlice) (msg Message) { } /* -A PexRequestMessage requests additional peer addresses. +A pexRequestMessage requests additional peer addresses. */ -type PexRequestMessage struct { +type pexRequestMessage struct { } -// TODO: define NewPexRequestPacket instead? - -func NewPexRequestMessage() *PexRequestMessage { - return &PexRequestMessage{} +func (m *pexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { + return // nothing to write. } -func (m *PexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(pexTypeRequest, w, n, err) - return -} - -func (m *PexRequestMessage) String() string { - return "[PexRequest]" +func (m *pexRequestMessage) String() string { + return "[pexRequest]" } /* A message with announced peer addresses. */ -type PexAddrsMessage struct { +type pexAddrsMessage struct { Addrs []*NetAddress } -func readPexAddrsMessage(r io.Reader) *PexAddrsMessage { +func readPexAddrsMessage(r io.Reader) *pexAddrsMessage { numAddrs := int(ReadUInt32(r)) addrs := []*NetAddress{} for i := 0; i < numAddrs; i++ { addr := ReadNetAddress(r) addrs = append(addrs, addr) } - return &PexAddrsMessage{ + return &pexAddrsMessage{ Addrs: addrs, } } -func (m *PexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) { - n, err = WriteOnto(pexTypeAddrs, w, n, err) - n, err = WriteOnto(UInt32(len(m.Addrs)), w, n, err) +func (m *pexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) { + n, err = WriteTo(UInt32(len(m.Addrs)), w, n, err) for _, addr := range m.Addrs { - n, err = WriteOnto(addr, w, n, err) + n, err = WriteTo(addr, w, n, err) } return } -func (m *PexAddrsMessage) String() string { - return fmt.Sprintf("[PexAddrs %v]", m.Addrs) +func (m *pexAddrsMessage) String() string { + return fmt.Sprintf("[pexAddrs %v]", m.Addrs) }