diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 6ddc968ff..2f27afc81 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -220,7 +220,7 @@ func NewState( doWALCatchup: true, wal: nilWAL{}, evpool: evpool, - evsw: tmevents.NewEventSwitch(logger), + evsw: tmevents.NewEventSwitch(), metrics: NopMetrics(), onStopCh: make(chan *cstypes.RoundState), } diff --git a/libs/events/events.go b/libs/events/events.go index d96afd7bd..5ad1170a9 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -3,22 +3,9 @@ package events import ( "context" - "fmt" "sync" - - "github.com/tendermint/tendermint/libs/log" ) -// ErrListenerWasRemoved is returned by AddEvent if the listener was removed. -type ErrListenerWasRemoved struct { - listenerID string -} - -// Error implements the error interface. -func (e ErrListenerWasRemoved) Error() string { - return fmt.Sprintf("listener #%s was removed", e.listenerID) -} - // EventData is a generic event data can be typed and registered with // tendermint/go-amino via concrete implementation of this interface. type EventData interface{} @@ -51,13 +38,11 @@ type EventSwitch interface { type eventSwitch struct { mtx sync.RWMutex eventCells map[string]*eventCell - listeners map[string]*eventListener } -func NewEventSwitch(logger log.Logger) EventSwitch { +func NewEventSwitch() EventSwitch { evsw := &eventSwitch{ eventCells: make(map[string]*eventCell), - listeners: make(map[string]*eventListener), } return evsw } @@ -71,20 +56,9 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E eventCell = newEventCell() evsw.eventCells[eventValue] = eventCell } - - listener := evsw.listeners[listenerID] - if listener == nil { - listener = newEventListener(listenerID) - evsw.listeners[listenerID] = listener - } - evsw.mtx.Unlock() - if err := listener.AddEvent(eventValue); err != nil { - return err - } - - eventCell.AddListener(listenerID, cb) + eventCell.addListener(listenerID, cb) return nil } @@ -99,11 +73,13 @@ func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data Event } // Fire event for all listeners in eventCell - eventCell.FireEvent(ctx, data) + eventCell.fireEvent(ctx, data) } //----------------------------------------------------------------------------- +type EventCallback func(ctx context.Context, data EventData) error + // eventCell handles keeping track of listener callbacks for a given event. type eventCell struct { mtx sync.RWMutex @@ -116,21 +92,13 @@ func newEventCell() *eventCell { } } -func (cell *eventCell) AddListener(listenerID string, cb EventCallback) { +func (cell *eventCell) addListener(listenerID string, cb EventCallback) { cell.mtx.Lock() + defer cell.mtx.Unlock() cell.listeners[listenerID] = cb - cell.mtx.Unlock() } -func (cell *eventCell) RemoveListener(listenerID string) int { - cell.mtx.Lock() - delete(cell.listeners, listenerID) - numListeners := len(cell.listeners) - cell.mtx.Unlock() - return numListeners -} - -func (cell *eventCell) FireEvent(ctx context.Context, data EventData) { +func (cell *eventCell) fireEvent(ctx context.Context, data EventData) { cell.mtx.RLock() eventCallbacks := make([]EventCallback, 0, len(cell.listeners)) for _, cb := range cell.listeners { @@ -145,50 +113,3 @@ func (cell *eventCell) FireEvent(ctx context.Context, data EventData) { } } } - -//----------------------------------------------------------------------------- - -type EventCallback func(ctx context.Context, data EventData) error - -type eventListener struct { - id string - - mtx sync.RWMutex - removed bool - events []string -} - -func newEventListener(id string) *eventListener { - return &eventListener{ - id: id, - removed: false, - events: nil, - } -} - -func (evl *eventListener) AddEvent(event string) error { - evl.mtx.Lock() - - if evl.removed { - evl.mtx.Unlock() - return ErrListenerWasRemoved{listenerID: evl.id} - } - - evl.events = append(evl.events, event) - evl.mtx.Unlock() - return nil -} - -func (evl *eventListener) GetEvents() []string { - evl.mtx.RLock() - events := make([]string, len(evl.events)) - copy(events, evl.events) - evl.mtx.RUnlock() - return events -} - -func (evl *eventListener) SetRemoved() { - evl.mtx.Lock() - evl.removed = true - evl.mtx.Unlock() -} diff --git a/libs/events/events_test.go b/libs/events/events_test.go index 55e0103da..fba2feba8 100644 --- a/libs/events/events_test.go +++ b/libs/events/events_test.go @@ -8,8 +8,6 @@ import ( "time" "github.com/stretchr/testify/require" - - "github.com/tendermint/tendermint/libs/log" ) // TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single @@ -18,9 +16,7 @@ func TestAddListenerForEventFireOnce(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() messages := make(chan EventData) require.NoError(t, evsw.AddListenerForEvent("listener", "event", @@ -45,9 +41,7 @@ func TestAddListenerForEventFireMany(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() doneSum := make(chan uint64) doneSending := make(chan uint64) @@ -81,9 +75,7 @@ func TestAddListenerForDifferentEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() doneSum := make(chan uint64) doneSending1 := make(chan uint64) @@ -143,8 +135,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) @@ -235,9 +226,8 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { func TestManageListenersAsync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() doneSum1 := make(chan uint64) doneSum2 := make(chan uint64)