From e52166017de6401d2065dbacea4b43397baa5d7d Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 29 Jun 2014 10:48:52 -0700 Subject: [PATCH] writing client_test for round-trip messages. --- peer/client.go | 14 +++++++++----- peer/client_test.go | 9 ++++++--- peer/connection.go | 18 +++++++++++++++--- peer/peer.go | 26 +++++++++++++++++--------- 4 files changed, 47 insertions(+), 20 deletions(-) diff --git a/peer/client.go b/peer/client.go index 9b08a48d6..4a8c88f94 100644 --- a/peer/client.go +++ b/peer/client.go @@ -2,7 +2,6 @@ package peer import ( . "github.com/tendermint/tendermint/common" - . "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/merkle" "sync/atomic" "sync" @@ -25,7 +24,7 @@ type Client struct { targetNumPeers int makePeerFn func(*Connection) *Peer self *Peer - inQueues map[String]chan *InboundMsg + inQueues map[string]chan *InboundMsg mtx sync.Mutex peers merkle.Tree // addr -> *Peer @@ -44,7 +43,7 @@ func NewClient(makePeerFn func(*Connection) *Peer) *Client { Panicf("makePeerFn(nil) must return a prototypical peer for self") } - inQueues := make(map[String]chan *InboundMsg) + inQueues := make(map[string]chan *InboundMsg) for chName, _ := range self.channels { inQueues[chName] = make(chan *InboundMsg) } @@ -95,21 +94,26 @@ func (c *Client) AddPeerWithConnection(conn *Connection, outgoing bool) (*Peer, return peer, nil } -func (c *Client) Broadcast(chName String, msg Msg) { +func (c *Client) Broadcast(chName string, msg Msg) { if atomic.LoadUint32(&c.stopped) == 1 { return } + log.Tracef("Broadcast on [%v] msg: %v", chName, msg) for v := range c.Peers().Values() { peer := v.(*Peer) success := peer.TryQueueOut(chName , msg) + log.Tracef("Broadcast for peer %v success: %v", peer, success) if !success { // TODO: notify the peer } } + } -func (c *Client) PopMessage(chName String) *InboundMsg { +// blocks until a message is popped. +func (c *Client) PopMessage(chName string) *InboundMsg { if atomic.LoadUint32(&c.stopped) == 1 { return nil } + log.Tracef("PopMessage on [%v]", chName) q := c.inQueues[chName] if q == nil { Panicf("Expected inQueues[%f], found none", chName) } diff --git a/peer/client_test.go b/peer/client_test.go index e971b0022..6c6a11a25 100644 --- a/peer/client_test.go +++ b/peer/client_test.go @@ -11,7 +11,7 @@ func TestConnection(t *testing.T) { peerMaker := func(conn *Connection) *Peer { bufferSize := 10 p := NewPeer(conn) - p.channels = map[String]*Channel{} + p.channels = map[string]*Channel{} p.channels["ch1"] = NewChannel("ch1", bufferSize) p.channels["ch2"] = NewChannel("ch2", bufferSize) return p @@ -44,8 +44,11 @@ func TestConnection(t *testing.T) { } // TODO: test the transmission of information on channels. - time.Sleep(500 * time.Millisecond) - //inMsg := c2.PopMessage(String("")) + c1.Broadcast("ch1", Msg{Bytes:ByteSlice("test data")}) + time.Sleep(100 * time.Millisecond) + inMsg := c2.PopMessage("ch1") + + t.Logf("c2 popped message: %v", inMsg) s1.Stop() c2.Stop() diff --git a/peer/connection.go b/peer/connection.go index 348b7e91e..d04aba037 100644 --- a/peer/connection.go +++ b/peer/connection.go @@ -56,12 +56,14 @@ func (c *Connection) QueueOut(msg ByteSlice) bool { } func (c *Connection) Start() { + log.Debugf("Starting %v", c) go c.outHandler() go c.inHandler() } func (c *Connection) Stop() { if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { + log.Debugf("Stopping %v", c) close(c.quit) c.conn.Close() c.pingDebouncer.Stop() @@ -90,6 +92,7 @@ func (c *Connection) flush() { } func (c *Connection) outHandler() { + log.Tracef("Connection %v outHandler", c) FOR_LOOP: for { @@ -98,6 +101,9 @@ func (c *Connection) outHandler() { case <-c.pingDebouncer.Ch: _, err = PACKET_TYPE_PING.WriteTo(c.conn) case outMsg := <-c.outQueue: + log.Tracef("Found msg from outQueue. Writing msg to underlying connection") + _, err = PACKET_TYPE_MSG.WriteTo(c.conn) + if err != nil { break } _, err = outMsg.WriteTo(c.conn) case <-c.pong: _, err = PACKET_TYPE_PONG.WriteTo(c.conn) @@ -114,20 +120,24 @@ func (c *Connection) outHandler() { c.flush() } + log.Tracef("Connection %v outHandler done", c) + // cleanup } func (c *Connection) inHandler() { + log.Tracef("Connection %v inHandler", c) FOR_LOOP: for { msgType, err := ReadUInt8Safe(c.conn) - if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { log.Infof("Connection %v failed @ inHandler", c) c.Stop() } break FOR_LOOP + } else { + log.Tracef("Found msgType %v", msgType) } switch msgType { @@ -145,9 +155,10 @@ func (c *Connection) inHandler() { break FOR_LOOP } // What to do? - // TODO + // XXX + XXX well, we need to push it into the channel or something. + or at least provide an inQueue. log.Tracef("%v", msg) - default: Panicf("Unknown message type %v", msgType) } @@ -155,6 +166,7 @@ func (c *Connection) inHandler() { c.pingDebouncer.Reset() } + log.Tracef("Connection %v inHandler done", c) // cleanup close(c.pong) for _ = range c.pong { diff --git a/peer/peer.go b/peer/peer.go index a1197628d..cf59a883b 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -14,7 +14,7 @@ import ( type Peer struct { outgoing bool conn *Connection - channels map[String]*Channel + channels map[string]*Channel mtx sync.Mutex quit chan struct{} @@ -29,7 +29,9 @@ func NewPeer(conn *Connection) *Peer { } } -func (p *Peer) Start(peerInQueues map[String]chan *InboundMsg ) { +func (p *Peer) Start(peerInQueues map[string]chan *InboundMsg ) { + log.Debugf("Starting %v", p) + p.conn.Start() for chName, _ := range p.channels { go p.inHandler(chName, peerInQueues[chName]) go p.outHandler(chName) @@ -40,6 +42,7 @@ func (p *Peer) Stop() { // lock p.mtx.Lock() if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { + log.Debugf("Stopping %v", p) close(p.quit) p.conn.Stop() } @@ -55,13 +58,13 @@ func (p *Peer) RemoteAddress() *NetAddress { return p.conn.RemoteAddress() } -func (p *Peer) Channel(chName String) *Channel { +func (p *Peer) Channel(chName string) *Channel { return p.channels[chName] } // Queue the msg for output. // If the queue is full, just return false. -func (p *Peer) TryQueueOut(chName String, msg Msg) bool { +func (p *Peer) TryQueueOut(chName string, msg Msg) bool { channel := p.Channel(chName) outQueue := channel.OutQueue() @@ -85,7 +88,8 @@ func (p *Peer) String() string { return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outgoing) } -func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) { +func (p *Peer) inHandler(chName string, inboundMsgQueue chan<- *InboundMsg) { + log.Tracef("Peer %v inHandler [%v]", p, chName) channel := p.channels[chName] inQueue := channel.InQueue() @@ -111,11 +115,13 @@ func (p *Peer) inHandler(chName String, inboundMsgQueue chan<- *InboundMsg) { } } + log.Tracef("Peer %v inHandler [%v] closed", p, chName) // cleanup // (none) } -func (p *Peer) outHandler(chName String) { +func (p *Peer) outHandler(chName string) { + log.Tracef("Peer %v outHandler [%v]", p, chName) outQueue := p.channels[chName].outQueue FOR_LOOP: for { @@ -123,12 +129,14 @@ func (p *Peer) outHandler(chName String) { case <-p.quit: break FOR_LOOP case msg := <-outQueue: + log.Tracef("Sending msg to peer outQueue") // blocks until the connection is Stop'd, // which happens when this peer is Stop'd. p.conn.QueueOut(msg.Bytes) } } + log.Tracef("Peer %v outHandler [%v] closed", p, chName) // cleanup // (none) } @@ -137,7 +145,7 @@ func (p *Peer) outHandler(chName String) { /* Channel */ type Channel struct { - name String + name string inQueue chan Msg outQueue chan Msg //stats Stats @@ -145,13 +153,13 @@ type Channel struct { func NewChannel(name string, bufferSize int) *Channel { return &Channel{ - name: String(name), + name: name, inQueue: make(chan Msg, bufferSize), outQueue: make(chan Msg, bufferSize), } } -func (c *Channel) Name() String { +func (c *Channel) Name() string { return c.name }