diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 5588a5b70..9ca8efdda 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -220,7 +220,7 @@ func NewState( doWALCatchup: true, wal: nilWAL{}, evpool: evpool, - evsw: tmevents.NewEventSwitch(logger), + evsw: tmevents.NewEventSwitch(), metrics: NopMetrics(), onStopCh: make(chan *cstypes.RoundState), } diff --git a/internal/state/time.go b/internal/state/time.go deleted file mode 100644 index c0770b3af..000000000 --- a/internal/state/time.go +++ /dev/null @@ -1,46 +0,0 @@ -package state - -import ( - "sort" - "time" -) - -// weightedTime for computing a median. -type weightedTime struct { - Time time.Time - Weight int64 -} - -// newWeightedTime with time and weight. -func newWeightedTime(time time.Time, weight int64) *weightedTime { - return &weightedTime{ - Time: time, - Weight: weight, - } -} - -// weightedMedian computes weighted median time for a given array of WeightedTime and the total voting power. -func weightedMedian(weightedTimes []*weightedTime, totalVotingPower int64) (res time.Time) { - median := totalVotingPower / 2 - - sort.Slice(weightedTimes, func(i, j int) bool { - if weightedTimes[i] == nil { - return false - } - if weightedTimes[j] == nil { - return true - } - return weightedTimes[i].Time.UnixNano() < weightedTimes[j].Time.UnixNano() - }) - - for _, weightedTime := range weightedTimes { - if weightedTime != nil { - if median <= weightedTime.Weight { - res = weightedTime.Time - break - } - median -= weightedTime.Weight - } - } - return -} diff --git a/internal/state/time_test.go b/internal/state/time_test.go deleted file mode 100644 index 5da97e819..000000000 --- a/internal/state/time_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package state - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - - tmtime "github.com/tendermint/tendermint/libs/time" -) - -func TestWeightedMedian(t *testing.T) { - m := make([]*weightedTime, 3) - - t1 := tmtime.Now() - t2 := t1.Add(5 * time.Second) - t3 := t1.Add(10 * time.Second) - - m[2] = newWeightedTime(t1, 33) // faulty processes - m[0] = newWeightedTime(t2, 40) // correct processes - m[1] = newWeightedTime(t3, 27) // correct processes - totalVotingPower := int64(100) - - median := weightedMedian(m, totalVotingPower) - assert.Equal(t, t2, median) - // median always returns value between values of correct processes - assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) && - (median.Before(t3) || median.Equal(t3))) - - m[1] = newWeightedTime(t1, 40) // correct processes - m[2] = newWeightedTime(t2, 27) // correct processes - m[0] = newWeightedTime(t3, 33) // faulty processes - totalVotingPower = int64(100) - - median = weightedMedian(m, totalVotingPower) - assert.Equal(t, t2, median) - // median always returns value between values of correct processes - assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) && - (median.Before(t2) || median.Equal(t2))) - - m = make([]*weightedTime, 8) - t4 := t1.Add(15 * time.Second) - t5 := t1.Add(60 * time.Second) - - m[3] = newWeightedTime(t1, 10) // correct processes - m[1] = newWeightedTime(t2, 10) // correct processes - m[5] = newWeightedTime(t2, 10) // correct processes - m[4] = newWeightedTime(t3, 23) // faulty processes - m[0] = newWeightedTime(t4, 20) // correct processes - m[7] = newWeightedTime(t5, 10) // faulty processes - totalVotingPower = int64(83) - - median = weightedMedian(m, totalVotingPower) - assert.Equal(t, t3, median) - // median always returns value between values of correct processes - assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) && - (median.Before(t4) || median.Equal(t4))) -} diff --git a/libs/events/events.go b/libs/events/events.go index d96afd7bd..5ad1170a9 100644 --- a/libs/events/events.go +++ b/libs/events/events.go @@ -3,22 +3,9 @@ package events import ( "context" - "fmt" "sync" - - "github.com/tendermint/tendermint/libs/log" ) -// ErrListenerWasRemoved is returned by AddEvent if the listener was removed. -type ErrListenerWasRemoved struct { - listenerID string -} - -// Error implements the error interface. -func (e ErrListenerWasRemoved) Error() string { - return fmt.Sprintf("listener #%s was removed", e.listenerID) -} - // EventData is a generic event data can be typed and registered with // tendermint/go-amino via concrete implementation of this interface. type EventData interface{} @@ -51,13 +38,11 @@ type EventSwitch interface { type eventSwitch struct { mtx sync.RWMutex eventCells map[string]*eventCell - listeners map[string]*eventListener } -func NewEventSwitch(logger log.Logger) EventSwitch { +func NewEventSwitch() EventSwitch { evsw := &eventSwitch{ eventCells: make(map[string]*eventCell), - listeners: make(map[string]*eventListener), } return evsw } @@ -71,20 +56,9 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E eventCell = newEventCell() evsw.eventCells[eventValue] = eventCell } - - listener := evsw.listeners[listenerID] - if listener == nil { - listener = newEventListener(listenerID) - evsw.listeners[listenerID] = listener - } - evsw.mtx.Unlock() - if err := listener.AddEvent(eventValue); err != nil { - return err - } - - eventCell.AddListener(listenerID, cb) + eventCell.addListener(listenerID, cb) return nil } @@ -99,11 +73,13 @@ func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data Event } // Fire event for all listeners in eventCell - eventCell.FireEvent(ctx, data) + eventCell.fireEvent(ctx, data) } //----------------------------------------------------------------------------- +type EventCallback func(ctx context.Context, data EventData) error + // eventCell handles keeping track of listener callbacks for a given event. type eventCell struct { mtx sync.RWMutex @@ -116,21 +92,13 @@ func newEventCell() *eventCell { } } -func (cell *eventCell) AddListener(listenerID string, cb EventCallback) { +func (cell *eventCell) addListener(listenerID string, cb EventCallback) { cell.mtx.Lock() + defer cell.mtx.Unlock() cell.listeners[listenerID] = cb - cell.mtx.Unlock() } -func (cell *eventCell) RemoveListener(listenerID string) int { - cell.mtx.Lock() - delete(cell.listeners, listenerID) - numListeners := len(cell.listeners) - cell.mtx.Unlock() - return numListeners -} - -func (cell *eventCell) FireEvent(ctx context.Context, data EventData) { +func (cell *eventCell) fireEvent(ctx context.Context, data EventData) { cell.mtx.RLock() eventCallbacks := make([]EventCallback, 0, len(cell.listeners)) for _, cb := range cell.listeners { @@ -145,50 +113,3 @@ func (cell *eventCell) FireEvent(ctx context.Context, data EventData) { } } } - -//----------------------------------------------------------------------------- - -type EventCallback func(ctx context.Context, data EventData) error - -type eventListener struct { - id string - - mtx sync.RWMutex - removed bool - events []string -} - -func newEventListener(id string) *eventListener { - return &eventListener{ - id: id, - removed: false, - events: nil, - } -} - -func (evl *eventListener) AddEvent(event string) error { - evl.mtx.Lock() - - if evl.removed { - evl.mtx.Unlock() - return ErrListenerWasRemoved{listenerID: evl.id} - } - - evl.events = append(evl.events, event) - evl.mtx.Unlock() - return nil -} - -func (evl *eventListener) GetEvents() []string { - evl.mtx.RLock() - events := make([]string, len(evl.events)) - copy(events, evl.events) - evl.mtx.RUnlock() - return events -} - -func (evl *eventListener) SetRemoved() { - evl.mtx.Lock() - evl.removed = true - evl.mtx.Unlock() -} diff --git a/libs/events/events_test.go b/libs/events/events_test.go index 55e0103da..fba2feba8 100644 --- a/libs/events/events_test.go +++ b/libs/events/events_test.go @@ -8,8 +8,6 @@ import ( "time" "github.com/stretchr/testify/require" - - "github.com/tendermint/tendermint/libs/log" ) // TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single @@ -18,9 +16,7 @@ func TestAddListenerForEventFireOnce(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() messages := make(chan EventData) require.NoError(t, evsw.AddListenerForEvent("listener", "event", @@ -45,9 +41,7 @@ func TestAddListenerForEventFireMany(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() doneSum := make(chan uint64) doneSending := make(chan uint64) @@ -81,9 +75,7 @@ func TestAddListenerForDifferentEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() doneSum := make(chan uint64) doneSending1 := make(chan uint64) @@ -143,8 +135,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) @@ -235,9 +226,8 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) { func TestManageListenersAsync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) - evsw := NewEventSwitch(logger) + evsw := NewEventSwitch() doneSum1 := make(chan uint64) doneSum2 := make(chan uint64) diff --git a/libs/service/service.go b/libs/service/service.go index 6221c7d92..3ce08e7da 100644 --- a/libs/service/service.go +++ b/libs/service/service.go @@ -12,11 +12,8 @@ var ( // errAlreadyStopped is returned when somebody tries to stop an already // stopped service (without resetting it). errAlreadyStopped = errors.New("already stopped") -) -var ( _ Service = (*BaseService)(nil) - _ Service = (*NopService)(nil) ) // Service defines a service that can be started, stopped, and reset. @@ -75,8 +72,7 @@ Typical usage: } func (fs *FooService) OnStop() { - // close/destroy private fields - // stop subroutines, etc. + // close/destroy private fields and releases resources } */ type BaseService struct { @@ -90,12 +86,6 @@ type BaseService struct { impl Implementation } -type NopService struct{} - -func (NopService) Start(_ context.Context) error { return nil } -func (NopService) IsRunning() bool { return true } -func (NopService) Wait() {} - // NewBaseService creates a new BaseService. func NewBaseService(logger log.Logger, name string, impl Implementation) *BaseService { return &BaseService{ @@ -206,5 +196,5 @@ func (bs *BaseService) getWait() <-chan struct{} { // Wait blocks until the service is stopped. func (bs *BaseService) Wait() { <-bs.getWait() } -// String implements Service by returning a string representation of the service. +// String provides a human-friendly representation of the service. func (bs *BaseService) String() string { return bs.name } diff --git a/scripts/confix/confix.go b/scripts/confix/confix.go index a3e6d57bb..d77950e7f 100644 --- a/scripts/confix/confix.go +++ b/scripts/confix/confix.go @@ -98,8 +98,6 @@ var plan = transform.Plan{ }, { // Since https://github.com/tendermint/tendermint/pull/6241. - // - // TODO(creachadair): backport into v0.35.x. Desc: `Add top-level mode setting (default "full")`, T: transform.EnsureKey(nil, &parser.KeyValue{ Block: parser.Comments{"Mode of Node: full | validator | seed"}, @@ -134,16 +132,12 @@ var plan = transform.Plan{ }, { // Since https://github.com/tendermint/tendermint/pull/6396. - // - // TODO(creachadair): backport into v0.35.x. Desc: "Remove vestigial mempool.wal-dir setting", T: transform.Remove(parser.Key{"mempool", "wal-dir"}), ErrorOK: true, }, { // Since https://github.com/tendermint/tendermint/pull/6323. - // - // TODO(creachadair): backport into v0.35.x. Desc: "Add new [p2p] queue-type setting", T: transform.EnsureKey(parser.Key{"p2p"}, &parser.KeyValue{ Block: parser.Comments{"Select the p2p internal queue"}, @@ -154,8 +148,6 @@ var plan = transform.Plan{ }, { // Since https://github.com/tendermint/tendermint/pull/6353. - // - // TODO(creachadair): backport into v0.35.x. Desc: "Add [p2p] connection count and rate limit settings", T: transform.Func(func(_ context.Context, doc *tomledit.Document) error { tab := transform.FindTable(doc, "p2p") @@ -181,8 +173,6 @@ var plan = transform.Plan{ // Added "chunk-fetchers" https://github.com/tendermint/tendermint/pull/6566. // This value was backported into v0.34.11 (modulo casing). // Renamed to "fetchers" https://github.com/tendermint/tendermint/pull/6587. - // - // TODO(creachadair): backport into v0.35.x. Desc: "Rename statesync.chunk-fetchers to statesync.fetchers", T: transform.Func(func(ctx context.Context, doc *tomledit.Document) error { // If the key already exists, rename it preserving its value. @@ -204,8 +194,6 @@ var plan = transform.Plan{ { // Since https://github.com/tendermint/tendermint/pull/6807. // Backported into v0.34.13 (modulo casing). - // - // TODO(creachadair): backport into v0.35.x. Desc: "Add statesync.use-p2p setting", T: transform.EnsureKey(parser.Key{"statesync"}, &parser.KeyValue{ Block: parser.Comments{