diff --git a/internal/libs/sync/closer.go b/internal/libs/sync/closer.go deleted file mode 100644 index 815ee1e80..000000000 --- a/internal/libs/sync/closer.go +++ /dev/null @@ -1,31 +0,0 @@ -package sync - -import "sync" - -// Closer implements a primitive to close a channel that signals process -// termination while allowing a caller to call Close multiple times safely. It -// should be used in cases where guarantees cannot be made about when and how -// many times closure is executed. -type Closer struct { - closeOnce sync.Once - doneCh chan struct{} -} - -// NewCloser returns a reference to a new Closer. -func NewCloser() *Closer { - return &Closer{doneCh: make(chan struct{})} -} - -// Done returns the internal done channel allowing the caller either block or wait -// for the Closer to be terminated/closed. -func (c *Closer) Done() <-chan struct{} { - return c.doneCh -} - -// Close gracefully closes the Closer. A caller should only call Close once, but -// it is safe to call it successive times. -func (c *Closer) Close() { - c.closeOnce.Do(func() { - close(c.doneCh) - }) -} diff --git a/internal/libs/sync/closer_test.go b/internal/libs/sync/closer_test.go deleted file mode 100644 index aea915215..000000000 --- a/internal/libs/sync/closer_test.go +++ /dev/null @@ -1,28 +0,0 @@ -package sync_test - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" -) - -func TestCloser(t *testing.T) { - closer := tmsync.NewCloser() - - var timeout bool - - select { - case <-closer.Done(): - case <-time.After(time.Second): - timeout = true - } - - for i := 0; i < 10; i++ { - closer.Close() - } - - require.True(t, timeout) - <-closer.Done() -} diff --git a/internal/p2p/pqueue.go b/internal/p2p/pqueue.go index ebfa2885b..229fb8203 100644 --- a/internal/p2p/pqueue.go +++ b/internal/p2p/pqueue.go @@ -5,10 +5,10 @@ import ( "context" "sort" "strconv" + "sync" "time" "github.com/gogo/protobuf/proto" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/libs/log" ) @@ -78,8 +78,10 @@ type pqScheduler struct { enqueueCh chan Envelope dequeueCh chan Envelope - closer *tmsync.Closer - done *tmsync.Closer + + closeFn func() + closeCh <-chan struct{} + done chan struct{} } func newPQScheduler( @@ -108,6 +110,9 @@ func newPQScheduler( pq := make(priorityQueue, 0) heap.Init(&pq) + closeCh := make(chan struct{}) + once := &sync.Once{} + return &pqScheduler{ logger: logger.With("router", "scheduler"), metrics: m, @@ -118,32 +123,18 @@ func newPQScheduler( sizes: sizes, enqueueCh: make(chan Envelope, enqueueBuf), dequeueCh: make(chan Envelope, dequeueBuf), - closer: tmsync.NewCloser(), - done: tmsync.NewCloser(), + closeFn: func() { once.Do(func() { close(closeCh) }) }, + closeCh: closeCh, + done: make(chan struct{}), } } -func (s *pqScheduler) enqueue() chan<- Envelope { - return s.enqueueCh -} - -func (s *pqScheduler) dequeue() <-chan Envelope { - return s.dequeueCh -} - -func (s *pqScheduler) close() { - s.closer.Close() - <-s.done.Done() -} - -func (s *pqScheduler) closed() <-chan struct{} { - return s.closer.Done() -} - // start starts non-blocking process that starts the priority queue scheduler. -func (s *pqScheduler) start(ctx context.Context) { - go s.process(ctx) -} +func (s *pqScheduler) start(ctx context.Context) { go s.process(ctx) } +func (s *pqScheduler) enqueue() chan<- Envelope { return s.enqueueCh } +func (s *pqScheduler) dequeue() <-chan Envelope { return s.dequeueCh } +func (s *pqScheduler) close() { s.closeFn() } +func (s *pqScheduler) closed() <-chan struct{} { return s.done } // process starts a block process where we listen for Envelopes to enqueue. If // there is sufficient capacity, it will be enqueued into the priority queue, @@ -155,7 +146,7 @@ func (s *pqScheduler) start(ctx context.Context) { // After we attempt to enqueue the incoming Envelope, if the priority queue is // non-empty, we pop the top Envelope and send it on the dequeueCh. func (s *pqScheduler) process(ctx context.Context) { - defer s.done.Close() + defer close(s.done) for { select { @@ -264,13 +255,13 @@ func (s *pqScheduler) process(ctx context.Context) { "peer_id", string(pqEnv.envelope.To)).Add(float64(-pqEnv.size)) select { case s.dequeueCh <- pqEnv.envelope: - case <-s.closer.Done(): + case <-s.closeCh: return } } case <-ctx.Done(): return - case <-s.closer.Done(): + case <-s.closeCh: return } } diff --git a/internal/p2p/queue.go b/internal/p2p/queue.go index cf36d3ca6..2ce2f23fe 100644 --- a/internal/p2p/queue.go +++ b/internal/p2p/queue.go @@ -1,7 +1,7 @@ package p2p import ( - tmsync "github.com/tendermint/tendermint/internal/libs/sync" + "sync" ) // default capacity for the size of a queue @@ -32,28 +32,22 @@ type queue interface { // in the order they were received, and blocks until message is received. type fifoQueue struct { queueCh chan Envelope - closer *tmsync.Closer + closeFn func() + closeCh <-chan struct{} } func newFIFOQueue(size int) queue { + closeCh := make(chan struct{}) + once := &sync.Once{} + return &fifoQueue{ queueCh: make(chan Envelope, size), - closer: tmsync.NewCloser(), + closeFn: func() { once.Do(func() { close(closeCh) }) }, + closeCh: closeCh, } } -func (q *fifoQueue) enqueue() chan<- Envelope { - return q.queueCh -} - -func (q *fifoQueue) dequeue() <-chan Envelope { - return q.queueCh -} - -func (q *fifoQueue) close() { - q.closer.Close() -} - -func (q *fifoQueue) closed() <-chan struct{} { - return q.closer.Done() -} +func (q *fifoQueue) enqueue() chan<- Envelope { return q.queueCh } +func (q *fifoQueue) dequeue() <-chan Envelope { return q.queueCh } +func (q *fifoQueue) close() { q.closeFn() } +func (q *fifoQueue) closed() <-chan struct{} { return q.closeCh } diff --git a/internal/p2p/router_filter_test.go b/internal/p2p/router_filter_test.go index 4082dc928..cfef6d10b 100644 --- a/internal/p2p/router_filter_test.go +++ b/internal/p2p/router_filter_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/libs/log" ) @@ -29,6 +28,6 @@ func TestConnectionFiltering(t *testing.T) { }, } require.Equal(t, 0, filterByIPCount) - router.openConnection(ctx, &MemoryConnection{logger: logger, closer: sync.NewCloser()}) + router.openConnection(ctx, &MemoryConnection{logger: logger, closeFn: func() {}}) require.Equal(t, 1, filterByIPCount) } diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index e4d78529a..2b5632104 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -19,7 +19,6 @@ import ( dbm "github.com/tendermint/tm-db" "github.com/tendermint/tendermint/crypto" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/mocks" "github.com/tendermint/tendermint/internal/p2p/p2ptest" @@ -385,12 +384,12 @@ func TestRouter_AcceptPeers(t *testing.T) { t.Cleanup(leaktest.Check(t)) // Set up a mock transport that handshakes. - closer := tmsync.NewCloser() + connCtx, connCancel := context.WithCancel(context.Background()) mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). Return(tc.peerInfo, tc.peerKey, nil) - mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil).Maybe() + mockConnection.On("Close").Run(func(_ mock.Arguments) { connCancel() }).Return(nil).Maybe() mockConnection.On("RemoteEndpoint").Return(p2p.Endpoint{}) if tc.ok { mockConnection.On("ReceiveMessage", mock.Anything).Return(chID, nil, io.EOF).Maybe() @@ -433,7 +432,7 @@ func TestRouter_AcceptPeers(t *testing.T) { time.Sleep(time.Millisecond) } else { select { - case <-closer.Done(): + case <-connCtx.Done(): case <-time.After(100 * time.Millisecond): require.Fail(t, "connection not closed") } @@ -620,13 +619,14 @@ func TestRouter_DialPeers(t *testing.T) { endpoint := p2p.Endpoint{Protocol: "mock", Path: string(tc.dialID)} // Set up a mock transport that handshakes. - closer := tmsync.NewCloser() + connCtx, connCancel := context.WithCancel(context.Background()) + defer connCancel() mockConnection := &mocks.Connection{} mockConnection.On("String").Maybe().Return("mock") if tc.dialErr == nil { mockConnection.On("Handshake", mock.Anything, selfInfo, selfKey). Return(tc.peerInfo, tc.peerKey, nil) - mockConnection.On("Close").Run(func(_ mock.Arguments) { closer.Close() }).Return(nil).Maybe() + mockConnection.On("Close").Run(func(_ mock.Arguments) { connCancel() }).Return(nil).Maybe() } if tc.ok { mockConnection.On("ReceiveMessage", mock.Anything).Return(chID, nil, io.EOF).Maybe() @@ -644,7 +644,7 @@ func TestRouter_DialPeers(t *testing.T) { mockTransport.On("Dial", mock.Anything, endpoint).Maybe().Return(nil, io.EOF) } else { mockTransport.On("Dial", mock.Anything, endpoint).Once(). - Run(func(_ mock.Arguments) { closer.Close() }). + Run(func(_ mock.Arguments) { connCancel() }). Return(nil, tc.dialErr) } @@ -681,7 +681,7 @@ func TestRouter_DialPeers(t *testing.T) { time.Sleep(time.Millisecond) } else { select { - case <-closer.Done(): + case <-connCtx.Done(): case <-time.After(100 * time.Millisecond): require.Fail(t, "connection not closed") } diff --git a/internal/p2p/transport_memory.go b/internal/p2p/transport_memory.go index 27b9e77e1..f363c12be 100644 --- a/internal/p2p/transport_memory.go +++ b/internal/p2p/transport_memory.go @@ -9,7 +9,6 @@ import ( "sync" "github.com/tendermint/tendermint/crypto" - tmsync "github.com/tendermint/tendermint/internal/libs/sync" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) @@ -175,10 +174,17 @@ func (t *MemoryTransport) Dial(ctx context.Context, endpoint Endpoint) (Connecti inCh := make(chan memoryMessage, t.bufferSize) outCh := make(chan memoryMessage, t.bufferSize) - closer := tmsync.NewCloser() - outConn := newMemoryConnection(t.logger, t.nodeID, peer.nodeID, inCh, outCh, closer) - inConn := newMemoryConnection(peer.logger, peer.nodeID, t.nodeID, outCh, inCh, closer) + once := &sync.Once{} + closeCh := make(chan struct{}) + closeFn := func() { once.Do(func() { close(closeCh) }) } + + outConn := newMemoryConnection(t.logger, t.nodeID, peer.nodeID, inCh, outCh) + outConn.closeCh = closeCh + outConn.closeFn = closeFn + inConn := newMemoryConnection(peer.logger, peer.nodeID, t.nodeID, outCh, inCh) + inConn.closeCh = closeCh + inConn.closeFn = closeFn select { case peer.acceptCh <- inConn: @@ -202,7 +208,9 @@ type MemoryConnection struct { receiveCh <-chan memoryMessage sendCh chan<- memoryMessage - closer *tmsync.Closer + + closeFn func() + closeCh <-chan struct{} } // memoryMessage is passed internally, containing either a message or handshake. @@ -222,7 +230,6 @@ func newMemoryConnection( remoteID types.NodeID, receiveCh <-chan memoryMessage, sendCh chan<- memoryMessage, - closer *tmsync.Closer, ) *MemoryConnection { return &MemoryConnection{ logger: logger.With("remote", remoteID), @@ -230,7 +237,6 @@ func newMemoryConnection( remoteID: remoteID, receiveCh: receiveCh, sendCh: sendCh, - closer: closer, } } @@ -264,7 +270,7 @@ func (c *MemoryConnection) Handshake( select { case c.sendCh <- memoryMessage{nodeInfo: &nodeInfo, pubKey: privKey.PubKey()}: c.logger.Debug("sent handshake", "nodeInfo", nodeInfo) - case <-c.closer.Done(): + case <-c.closeCh: return types.NodeInfo{}, nil, io.EOF case <-ctx.Done(): return types.NodeInfo{}, nil, ctx.Err() @@ -277,7 +283,7 @@ func (c *MemoryConnection) Handshake( } c.logger.Debug("received handshake", "peerInfo", msg.nodeInfo) return *msg.nodeInfo, msg.pubKey, nil - case <-c.closer.Done(): + case <-c.closeCh: return types.NodeInfo{}, nil, io.EOF case <-ctx.Done(): return types.NodeInfo{}, nil, ctx.Err() @@ -289,7 +295,7 @@ func (c *MemoryConnection) ReceiveMessage(ctx context.Context) (ChannelID, []byt // Check close first, since channels are buffered. Otherwise, below select // may non-deterministically return non-error even when closed. select { - case <-c.closer.Done(): + case <-c.closeCh: return 0, nil, io.EOF case <-ctx.Done(): return 0, nil, io.EOF @@ -300,7 +306,9 @@ func (c *MemoryConnection) ReceiveMessage(ctx context.Context) (ChannelID, []byt case msg := <-c.receiveCh: c.logger.Debug("received message", "chID", msg.channelID, "msg", msg.message) return msg.channelID, msg.message, nil - case <-c.closer.Done(): + case <-ctx.Done(): + return 0, nil, io.EOF + case <-c.closeCh: return 0, nil, io.EOF } } @@ -310,7 +318,7 @@ func (c *MemoryConnection) SendMessage(ctx context.Context, chID ChannelID, msg // Check close first, since channels are buffered. Otherwise, below select // may non-deterministically return non-error even when closed. select { - case <-c.closer.Done(): + case <-c.closeCh: return io.EOF case <-ctx.Done(): return io.EOF @@ -323,19 +331,10 @@ func (c *MemoryConnection) SendMessage(ctx context.Context, chID ChannelID, msg return nil case <-ctx.Done(): return io.EOF - case <-c.closer.Done(): + case <-c.closeCh: return io.EOF } } // Close implements Connection. -func (c *MemoryConnection) Close() error { - select { - case <-c.closer.Done(): - return nil - default: - c.closer.Close() - c.logger.Info("closed connection") - } - return nil -} +func (c *MemoryConnection) Close() error { c.closeFn(); return nil }