From 989a2f32b1411d159be436d64136248f02289fba Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 9 Oct 2018 15:09:40 +0400 Subject: [PATCH] libs: Refactor & document events code (#2576) * [libs/events] add more godoc comments * [libs/events] refactor code - improve var naming - improve code structure - do not use defers for unlocking mutexes (defer takes time) --- libs/events/events.go | 66 +++++++++++++++++++++----------------- libs/events/events_test.go | 59 +++++++++++++++------------------- 2 files changed, 63 insertions(+), 62 deletions(-) diff --git a/libs/events/events.go b/libs/events/events.go index 864365563..fb90bbea6 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -1,6 +1,4 @@ -/* -Pub-Sub in go with event caching -*/ +// Package events - Pub-Sub in go with event caching package events import ( @@ -10,30 +8,40 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" ) +// ErrListenerWasRemoved is returned by AddEvent if the listener was removed. type ErrListenerWasRemoved struct { - listener string + listenerID string } +// Error implements the error interface. func (e ErrListenerWasRemoved) Error() string { - return fmt.Sprintf("listener %s was removed", e.listener) + return fmt.Sprintf("listener #%s was removed", e.listenerID) } -// Generic event data can be typed and registered with tendermint/go-amino -// via concrete implementation of this interface -type EventData interface { -} +// EventData is a generic event data can be typed and registered with +// tendermint/go-amino via concrete implementation of this interface. +type EventData interface{} -// reactors and other modules should export -// this interface to become eventable +// Eventable is the interface reactors and other modules must export to become +// eventable. type Eventable interface { SetEventSwitch(evsw EventSwitch) } -// an event switch or cache implements fireable +// Fireable is the interface that wraps the FireEvent method. +// +// FireEvent fires an event with the given name and data. type Fireable interface { FireEvent(event string, data EventData) } +// EventSwitch is the interface for synchronous pubsub, where listeners +// subscribe to certain events and, when an event is fired (see Fireable), +// notified via a callback function. +// +// Listeners are added by calling AddListenerForEvent function. +// They can be removed by calling either RemoveListenerForEvent or +// RemoveListener (for all events). type EventSwitch interface { cmn.Service Fireable @@ -67,7 +75,7 @@ func (evsw *eventSwitch) OnStart() error { func (evsw *eventSwitch) OnStop() {} func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventCallback) error { - // Get/Create eventCell and listener + // Get/Create eventCell and listener. evsw.mtx.Lock() eventCell := evsw.eventCells[event] if eventCell == nil { @@ -81,17 +89,17 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, event string, cb EventC } evsw.mtx.Unlock() - // Add event and listener - err := listener.AddEvent(event) - if err == nil { - eventCell.AddListener(listenerID, cb) + // Add event and listener. + if err := listener.AddEvent(event); err != nil { + return err } + eventCell.AddListener(listenerID, cb) - return err + return nil } func (evsw *eventSwitch) RemoveListener(listenerID string) { - // Get and remove listener + // Get and remove listener. evsw.mtx.RLock() listener := evsw.listeners[listenerID] evsw.mtx.RUnlock() @@ -180,14 +188,14 @@ func (cell *eventCell) RemoveListener(listenerID string) int { func (cell *eventCell) FireEvent(data EventData) { cell.mtx.RLock() - var listenerCopy []EventCallback - for _, listener := range cell.listeners { - listenerCopy = append(listenerCopy, listener) + var eventCallbacks []EventCallback + for _, cb := range cell.listeners { + eventCallbacks = append(eventCallbacks, cb) } cell.mtx.RUnlock() - for _, listener := range listenerCopy { - listener(data) + for _, cb := range eventCallbacks { + cb(data) } } @@ -213,27 +221,27 @@ func newEventListener(id string) *eventListener { func (evl *eventListener) AddEvent(event string) error { evl.mtx.Lock() - defer evl.mtx.Unlock() if evl.removed { - return ErrListenerWasRemoved{listener: evl.id} + evl.mtx.Unlock() + return ErrListenerWasRemoved{listenerID: evl.id} } evl.events = append(evl.events, event) + evl.mtx.Unlock() return nil } func (evl *eventListener) GetEvents() []string { evl.mtx.RLock() - defer evl.mtx.RUnlock() - events := make([]string, len(evl.events)) copy(events, evl.events) + evl.mtx.RUnlock() return events } func (evl *eventListener) SetRemoved() { evl.mtx.Lock() - defer evl.mtx.Unlock() evl.removed = true + evl.mtx.Unlock() } diff --git a/libs/events/events_test.go b/libs/events/events_test.go index 02ec44c4f..7530afa98 100644 --- a/libs/events/events_test.go +++ b/libs/events/events_test.go @@ -6,6 +6,8 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + cmn "github.com/tendermint/tendermint/libs/common" ) @@ -14,10 +16,9 @@ import ( func TestAddListenerForEventFireOnce(t *testing.T) { evsw := NewEventSwitch() err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } + require.NoError(t, err) defer evsw.Stop() + messages := make(chan EventData) evsw.AddListenerForEvent("listener", "event", func(data EventData) { @@ -35,10 +36,9 @@ func TestAddListenerForEventFireOnce(t *testing.T) { func TestAddListenerForEventFireMany(t *testing.T) { evsw := NewEventSwitch() err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } + require.NoError(t, err) defer evsw.Stop() + doneSum := make(chan uint64) doneSending := make(chan uint64) numbers := make(chan uint64, 4) @@ -65,10 +65,9 @@ func TestAddListenerForEventFireMany(t *testing.T) { func TestAddListenerForDifferentEvents(t *testing.T) { evsw := NewEventSwitch() err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } + require.NoError(t, err) defer evsw.Stop() + doneSum := make(chan uint64) doneSending1 := make(chan uint64) doneSending2 := make(chan uint64) @@ -111,10 +110,9 @@ func TestAddListenerForDifferentEvents(t *testing.T) { func TestAddDifferentListenerForDifferentEvents(t *testing.T) { evsw := NewEventSwitch() err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } + require.NoError(t, err) defer evsw.Stop() + doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) doneSending1 := make(chan uint64) @@ -174,40 +172,38 @@ func TestAddAndRemoveListenerConcurrency(t *testing.T) { evsw := NewEventSwitch() err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } + require.NoError(t, err) defer evsw.Stop() done1 := make(chan struct{}) done2 := make(chan struct{}) + // Must be executed concurrently to uncover the data race. + // 1. RemoveListener go func() { for i := 0; i < roundCount; i++ { evsw.RemoveListener("listener") } - done1 <- struct{}{} + close(done1) }() + // 2. AddListenerForEvent go func() { for i := 0; i < roundCount; i++ { - index := i //it necessary for closure + index := i evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index), func(data EventData) { t.Errorf("should not run callback for %d.\n", index) stopInputEvent = true }) } - done2 <- struct{}{} + close(done2) }() <-done1 <-done2 - close(done1) - close(done2) - - evsw.RemoveListener("listener") // make sure remove last + evsw.RemoveListener("listener") // remove the last listener for i := 0; i < roundCount && !stopInputEvent; i++ { evsw.FireEvent(fmt.Sprintf("event%d", i), uint64(1001)) @@ -220,10 +216,9 @@ func TestAddAndRemoveListenerConcurrency(t *testing.T) { func TestAddAndRemoveListener(t *testing.T) { evsw := NewEventSwitch() err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } + require.NoError(t, err) defer evsw.Stop() + doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) doneSending1 := make(chan uint64) @@ -266,10 +261,9 @@ func TestAddAndRemoveListener(t *testing.T) { func TestRemoveListener(t *testing.T) { evsw := NewEventSwitch() err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } + require.NoError(t, err) defer evsw.Stop() + count := 10 sum1, sum2 := 0, 0 // add some listeners and make sure they work @@ -320,10 +314,9 @@ func TestRemoveListener(t *testing.T) { func TestRemoveListenersAsync(t *testing.T) { evsw := NewEventSwitch() err := evsw.Start() - if err != nil { - t.Errorf("Failed to start EventSwitch, error: %v", err) - } + require.NoError(t, err) defer evsw.Stop() + doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) doneSending1 := make(chan uint64) @@ -406,7 +399,7 @@ func TestRemoveListenersAsync(t *testing.T) { // until the receiving channel `numbers` is closed; it then sends the sum // on `doneSum` and closes that channel. Expected to be run in a go-routine. func sumReceivedNumbers(numbers, doneSum chan uint64) { - var sum uint64 = 0 + var sum uint64 for { j, more := <-numbers sum += j @@ -425,7 +418,7 @@ func sumReceivedNumbers(numbers, doneSum chan uint64) { // the test to assert all events have also been received. func fireEvents(evsw EventSwitch, event string, doneChan chan uint64, offset uint64) { - var sentSum uint64 = 0 + var sentSum uint64 for i := offset; i <= offset+uint64(999); i++ { sentSum += i evsw.FireEvent(event, i)