mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-12 23:01:30 +00:00
Merge branch 'master' into wb/epoch-fixes-forward-port-master
This commit is contained in:
@@ -222,8 +222,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
// blocking until they all exit, as well as unsubscribing from events and stopping
|
||||
// state.
|
||||
func (r *Reactor) OnStop() {
|
||||
r.unsubscribeFromBroadcastEvents()
|
||||
|
||||
r.state.Stop()
|
||||
|
||||
if !r.WaitSync() {
|
||||
@@ -397,10 +395,6 @@ func (r *Reactor) subscribeToBroadcastEvents() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) unsubscribeFromBroadcastEvents() {
|
||||
r.state.evsw.RemoveListener(listenerIDConsensus)
|
||||
}
|
||||
|
||||
func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
|
||||
return &tmcons.NewRoundStep{
|
||||
Height: rs.Height,
|
||||
|
||||
@@ -50,8 +50,6 @@ type EventSwitch interface {
|
||||
Stop()
|
||||
|
||||
AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
|
||||
RemoveListenerForEvent(event string, listenerID string)
|
||||
RemoveListener(listenerID string)
|
||||
}
|
||||
|
||||
type eventSwitch struct {
|
||||
@@ -71,11 +69,8 @@ func NewEventSwitch(logger log.Logger) EventSwitch {
|
||||
return evsw
|
||||
}
|
||||
|
||||
func (evsw *eventSwitch) OnStart(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (evsw *eventSwitch) OnStop() {}
|
||||
func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil }
|
||||
func (evsw *eventSwitch) OnStop() {}
|
||||
|
||||
func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
|
||||
// Get/Create eventCell and listener.
|
||||
@@ -103,52 +98,6 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E
|
||||
return nil
|
||||
}
|
||||
|
||||
func (evsw *eventSwitch) RemoveListener(listenerID string) {
|
||||
// Get and remove listener.
|
||||
evsw.mtx.RLock()
|
||||
listener := evsw.listeners[listenerID]
|
||||
evsw.mtx.RUnlock()
|
||||
if listener == nil {
|
||||
return
|
||||
}
|
||||
|
||||
evsw.mtx.Lock()
|
||||
delete(evsw.listeners, listenerID)
|
||||
evsw.mtx.Unlock()
|
||||
|
||||
// Remove callback for each event.
|
||||
listener.SetRemoved()
|
||||
for _, event := range listener.GetEvents() {
|
||||
evsw.RemoveListenerForEvent(event, listenerID)
|
||||
}
|
||||
}
|
||||
|
||||
func (evsw *eventSwitch) RemoveListenerForEvent(event string, listenerID string) {
|
||||
// Get eventCell
|
||||
evsw.mtx.Lock()
|
||||
eventCell := evsw.eventCells[event]
|
||||
evsw.mtx.Unlock()
|
||||
|
||||
if eventCell == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Remove listenerID from eventCell
|
||||
numListeners := eventCell.RemoveListener(listenerID)
|
||||
|
||||
// Maybe garbage collect eventCell.
|
||||
if numListeners == 0 {
|
||||
// Lock again and double check.
|
||||
evsw.mtx.Lock() // OUTER LOCK
|
||||
eventCell.mtx.Lock() // INNER LOCK
|
||||
if len(eventCell.listeners) == 0 {
|
||||
delete(evsw.eventCells, event)
|
||||
}
|
||||
eventCell.mtx.Unlock() // INNER LOCK
|
||||
evsw.mtx.Unlock() // OUTER LOCK
|
||||
}
|
||||
}
|
||||
|
||||
func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data EventData) {
|
||||
// Get the eventCell
|
||||
evsw.mtx.RLock()
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -28,8 +27,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
|
||||
messages := make(chan EventData)
|
||||
require.NoError(t, evsw.AddListenerForEvent("listener", "event",
|
||||
func(ctx context.Context, data EventData) error {
|
||||
// test there's no deadlock if we remove the listener inside a callback
|
||||
evsw.RemoveListener("listener")
|
||||
select {
|
||||
case messages <- data:
|
||||
return nil
|
||||
@@ -234,171 +231,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddAndRemoveListenerConcurrency(t *testing.T) {
|
||||
var (
|
||||
stopInputEvent = false
|
||||
roundCount = 2000
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
logger := log.NewTestingLogger(t)
|
||||
|
||||
evsw := NewEventSwitch(logger)
|
||||
require.NoError(t, evsw.Start(ctx))
|
||||
t.Cleanup(evsw.Wait)
|
||||
|
||||
done1 := make(chan struct{})
|
||||
done2 := make(chan struct{})
|
||||
|
||||
// Must be executed concurrently to uncover the data race.
|
||||
// 1. RemoveListener
|
||||
go func() {
|
||||
defer close(done1)
|
||||
for i := 0; i < roundCount; i++ {
|
||||
evsw.RemoveListener("listener")
|
||||
}
|
||||
}()
|
||||
|
||||
// 2. AddListenerForEvent
|
||||
go func() {
|
||||
defer close(done2)
|
||||
for i := 0; i < roundCount; i++ {
|
||||
index := i
|
||||
// we explicitly ignore errors here, since the listener will sometimes be removed
|
||||
// (that's what we're testing)
|
||||
_ = evsw.AddListenerForEvent("listener", fmt.Sprintf("event%d", index),
|
||||
func(ctx context.Context, data EventData) error {
|
||||
t.Errorf("should not run callback for %d.\n", index)
|
||||
stopInputEvent = true
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
<-done1
|
||||
<-done2
|
||||
|
||||
evsw.RemoveListener("listener") // remove the last listener
|
||||
|
||||
for i := 0; i < roundCount && !stopInputEvent; i++ {
|
||||
evsw.FireEvent(ctx, fmt.Sprintf("event%d", i), uint64(1001))
|
||||
}
|
||||
}
|
||||
|
||||
// TestAddAndRemoveListener sets up an EventSwitch, subscribes a listener to
|
||||
// two events, fires a thousand integers for the first event, then unsubscribes
|
||||
// the listener and fires a thousand integers for the second event.
|
||||
func TestAddAndRemoveListener(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
logger := log.NewTestingLogger(t)
|
||||
evsw := NewEventSwitch(logger)
|
||||
require.NoError(t, evsw.Start(ctx))
|
||||
t.Cleanup(evsw.Wait)
|
||||
|
||||
doneSum1 := make(chan uint64)
|
||||
doneSum2 := make(chan uint64)
|
||||
doneSending1 := make(chan uint64)
|
||||
doneSending2 := make(chan uint64)
|
||||
numbers1 := make(chan uint64, 4)
|
||||
numbers2 := make(chan uint64, 4)
|
||||
// subscribe two listener to three events
|
||||
require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
|
||||
func(ctx context.Context, data EventData) error {
|
||||
select {
|
||||
case numbers1 <- data.(uint64):
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}))
|
||||
require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
|
||||
func(ctx context.Context, data EventData) error {
|
||||
select {
|
||||
case numbers2 <- data.(uint64):
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}))
|
||||
// collect received events for event1
|
||||
go sumReceivedNumbers(numbers1, doneSum1)
|
||||
// collect received events for event2
|
||||
go sumReceivedNumbers(numbers2, doneSum2)
|
||||
// go fire events
|
||||
go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
|
||||
checkSumEvent1 := <-doneSending1
|
||||
// after sending all event1, unsubscribe for all events
|
||||
evsw.RemoveListener("listener")
|
||||
go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
|
||||
checkSumEvent2 := <-doneSending2
|
||||
close(numbers1)
|
||||
close(numbers2)
|
||||
eventSum1 := <-doneSum1
|
||||
eventSum2 := <-doneSum2
|
||||
if checkSumEvent1 != eventSum1 ||
|
||||
// correct value asserted by preceding tests, suffices to be non-zero
|
||||
checkSumEvent2 == uint64(0) ||
|
||||
eventSum2 != uint64(0) {
|
||||
t.Errorf("not all messages sent were received or unsubscription did not register.\n")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRemoveListener does basic tests on adding and removing
|
||||
func TestRemoveListener(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
logger := log.NewTestingLogger(t)
|
||||
|
||||
evsw := NewEventSwitch(logger)
|
||||
require.NoError(t, evsw.Start(ctx))
|
||||
t.Cleanup(evsw.Wait)
|
||||
|
||||
count := 10
|
||||
sum1, sum2 := 0, 0
|
||||
// add some listeners and make sure they work
|
||||
require.NoError(t, evsw.AddListenerForEvent("listener", "event1",
|
||||
func(ctx context.Context, data EventData) error {
|
||||
sum1++
|
||||
return nil
|
||||
}))
|
||||
require.NoError(t, evsw.AddListenerForEvent("listener", "event2",
|
||||
func(ctx context.Context, data EventData) error {
|
||||
sum2++
|
||||
return nil
|
||||
}))
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
evsw.FireEvent(ctx, "event1", true)
|
||||
evsw.FireEvent(ctx, "event2", true)
|
||||
}
|
||||
assert.Equal(t, count, sum1)
|
||||
assert.Equal(t, count, sum2)
|
||||
|
||||
// remove one by event and make sure it is gone
|
||||
evsw.RemoveListenerForEvent("event2", "listener")
|
||||
for i := 0; i < count; i++ {
|
||||
evsw.FireEvent(ctx, "event1", true)
|
||||
evsw.FireEvent(ctx, "event2", true)
|
||||
}
|
||||
assert.Equal(t, count*2, sum1)
|
||||
assert.Equal(t, count, sum2)
|
||||
|
||||
// remove the listener entirely and make sure both gone
|
||||
evsw.RemoveListener("listener")
|
||||
for i := 0; i < count; i++ {
|
||||
evsw.FireEvent(ctx, "event1", true)
|
||||
evsw.FireEvent(ctx, "event2", true)
|
||||
}
|
||||
assert.Equal(t, count*2, sum1)
|
||||
assert.Equal(t, count, sum2)
|
||||
}
|
||||
|
||||
// TestAddAndRemoveListenersAsync sets up an EventSwitch, subscribes two
|
||||
// TestManagerLiistenersAsync sets up an EventSwitch, subscribes two
|
||||
// listeners to three events, and fires a thousand integers for each event.
|
||||
// These two listeners serve as the baseline validation while other listeners
|
||||
// are randomly subscribed and unsubscribed.
|
||||
@@ -408,7 +241,7 @@ func TestRemoveListener(t *testing.T) {
|
||||
// at that point subscribed to.
|
||||
// NOTE: it is important to run this test with race conditions tracking on,
|
||||
// `go test -race`, to examine for possible race conditions.
|
||||
func TestRemoveListenersAsync(t *testing.T) {
|
||||
func TestManageListenersAsync(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
logger := log.NewTestingLogger(t)
|
||||
@@ -494,18 +327,9 @@ func TestRemoveListenersAsync(t *testing.T) {
|
||||
func(context.Context, EventData) error { return nil })
|
||||
}
|
||||
}
|
||||
removeListenersStress := func() {
|
||||
r2 := rand.New(rand.NewSource(time.Now().Unix()))
|
||||
r2.Seed(time.Now().UnixNano())
|
||||
for k := uint16(0); k < 80; k++ {
|
||||
listenerNumber := r2.Intn(100) + 3
|
||||
go evsw.RemoveListener(fmt.Sprintf("listener%v", listenerNumber))
|
||||
}
|
||||
}
|
||||
addListenersStress()
|
||||
// go fire events
|
||||
go fireEvents(ctx, evsw, "event1", doneSending1, uint64(1))
|
||||
removeListenersStress()
|
||||
go fireEvents(ctx, evsw, "event2", doneSending2, uint64(1001))
|
||||
go fireEvents(ctx, evsw, "event3", doneSending3, uint64(2001))
|
||||
checkSumEvent1 := <-doneSending1
|
||||
|
||||
Reference in New Issue
Block a user