Merge latest changes from master

Signed-off-by: Thane Thomson <connect@thanethomson.com>
This commit is contained in:
Thane Thomson
2022-04-13 14:00:40 -04:00
7 changed files with 16 additions and 231 deletions

View File

@@ -220,7 +220,7 @@ func NewState(
doWALCatchup: true,
wal: nilWAL{},
evpool: evpool,
evsw: tmevents.NewEventSwitch(logger),
evsw: tmevents.NewEventSwitch(),
metrics: NopMetrics(),
onStopCh: make(chan *cstypes.RoundState),
}

View File

@@ -1,46 +0,0 @@
package state
import (
"sort"
"time"
)
// weightedTime for computing a median.
type weightedTime struct {
Time time.Time
Weight int64
}
// newWeightedTime with time and weight.
func newWeightedTime(time time.Time, weight int64) *weightedTime {
return &weightedTime{
Time: time,
Weight: weight,
}
}
// weightedMedian computes weighted median time for a given array of WeightedTime and the total voting power.
func weightedMedian(weightedTimes []*weightedTime, totalVotingPower int64) (res time.Time) {
median := totalVotingPower / 2
sort.Slice(weightedTimes, func(i, j int) bool {
if weightedTimes[i] == nil {
return false
}
if weightedTimes[j] == nil {
return true
}
return weightedTimes[i].Time.UnixNano() < weightedTimes[j].Time.UnixNano()
})
for _, weightedTime := range weightedTimes {
if weightedTime != nil {
if median <= weightedTime.Weight {
res = weightedTime.Time
break
}
median -= weightedTime.Weight
}
}
return
}

View File

@@ -1,58 +0,0 @@
package state
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
tmtime "github.com/tendermint/tendermint/libs/time"
)
func TestWeightedMedian(t *testing.T) {
m := make([]*weightedTime, 3)
t1 := tmtime.Now()
t2 := t1.Add(5 * time.Second)
t3 := t1.Add(10 * time.Second)
m[2] = newWeightedTime(t1, 33) // faulty processes
m[0] = newWeightedTime(t2, 40) // correct processes
m[1] = newWeightedTime(t3, 27) // correct processes
totalVotingPower := int64(100)
median := weightedMedian(m, totalVotingPower)
assert.Equal(t, t2, median)
// median always returns value between values of correct processes
assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) &&
(median.Before(t3) || median.Equal(t3)))
m[1] = newWeightedTime(t1, 40) // correct processes
m[2] = newWeightedTime(t2, 27) // correct processes
m[0] = newWeightedTime(t3, 33) // faulty processes
totalVotingPower = int64(100)
median = weightedMedian(m, totalVotingPower)
assert.Equal(t, t2, median)
// median always returns value between values of correct processes
assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) &&
(median.Before(t2) || median.Equal(t2)))
m = make([]*weightedTime, 8)
t4 := t1.Add(15 * time.Second)
t5 := t1.Add(60 * time.Second)
m[3] = newWeightedTime(t1, 10) // correct processes
m[1] = newWeightedTime(t2, 10) // correct processes
m[5] = newWeightedTime(t2, 10) // correct processes
m[4] = newWeightedTime(t3, 23) // faulty processes
m[0] = newWeightedTime(t4, 20) // correct processes
m[7] = newWeightedTime(t5, 10) // faulty processes
totalVotingPower = int64(83)
median = weightedMedian(m, totalVotingPower)
assert.Equal(t, t3, median)
// median always returns value between values of correct processes
assert.Equal(t, true, (median.After(t1) || median.Equal(t1)) &&
(median.Before(t4) || median.Equal(t4)))
}

View File

@@ -3,22 +3,9 @@ package events
import (
"context"
"fmt"
"sync"
"github.com/tendermint/tendermint/libs/log"
)
// ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
type ErrListenerWasRemoved struct {
listenerID string
}
// Error implements the error interface.
func (e ErrListenerWasRemoved) Error() string {
return fmt.Sprintf("listener #%s was removed", e.listenerID)
}
// EventData is a generic event data can be typed and registered with
// tendermint/go-amino via concrete implementation of this interface.
type EventData interface{}
@@ -51,13 +38,11 @@ type EventSwitch interface {
type eventSwitch struct {
mtx sync.RWMutex
eventCells map[string]*eventCell
listeners map[string]*eventListener
}
func NewEventSwitch(logger log.Logger) EventSwitch {
func NewEventSwitch() EventSwitch {
evsw := &eventSwitch{
eventCells: make(map[string]*eventCell),
listeners: make(map[string]*eventListener),
}
return evsw
}
@@ -71,20 +56,9 @@ func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb E
eventCell = newEventCell()
evsw.eventCells[eventValue] = eventCell
}
listener := evsw.listeners[listenerID]
if listener == nil {
listener = newEventListener(listenerID)
evsw.listeners[listenerID] = listener
}
evsw.mtx.Unlock()
if err := listener.AddEvent(eventValue); err != nil {
return err
}
eventCell.AddListener(listenerID, cb)
eventCell.addListener(listenerID, cb)
return nil
}
@@ -99,11 +73,13 @@ func (evsw *eventSwitch) FireEvent(ctx context.Context, event string, data Event
}
// Fire event for all listeners in eventCell
eventCell.FireEvent(ctx, data)
eventCell.fireEvent(ctx, data)
}
//-----------------------------------------------------------------------------
type EventCallback func(ctx context.Context, data EventData) error
// eventCell handles keeping track of listener callbacks for a given event.
type eventCell struct {
mtx sync.RWMutex
@@ -116,21 +92,13 @@ func newEventCell() *eventCell {
}
}
func (cell *eventCell) AddListener(listenerID string, cb EventCallback) {
func (cell *eventCell) addListener(listenerID string, cb EventCallback) {
cell.mtx.Lock()
defer cell.mtx.Unlock()
cell.listeners[listenerID] = cb
cell.mtx.Unlock()
}
func (cell *eventCell) RemoveListener(listenerID string) int {
cell.mtx.Lock()
delete(cell.listeners, listenerID)
numListeners := len(cell.listeners)
cell.mtx.Unlock()
return numListeners
}
func (cell *eventCell) FireEvent(ctx context.Context, data EventData) {
func (cell *eventCell) fireEvent(ctx context.Context, data EventData) {
cell.mtx.RLock()
eventCallbacks := make([]EventCallback, 0, len(cell.listeners))
for _, cb := range cell.listeners {
@@ -145,50 +113,3 @@ func (cell *eventCell) FireEvent(ctx context.Context, data EventData) {
}
}
}
//-----------------------------------------------------------------------------
type EventCallback func(ctx context.Context, data EventData) error
type eventListener struct {
id string
mtx sync.RWMutex
removed bool
events []string
}
func newEventListener(id string) *eventListener {
return &eventListener{
id: id,
removed: false,
events: nil,
}
}
func (evl *eventListener) AddEvent(event string) error {
evl.mtx.Lock()
if evl.removed {
evl.mtx.Unlock()
return ErrListenerWasRemoved{listenerID: evl.id}
}
evl.events = append(evl.events, event)
evl.mtx.Unlock()
return nil
}
func (evl *eventListener) GetEvents() []string {
evl.mtx.RLock()
events := make([]string, len(evl.events))
copy(events, evl.events)
evl.mtx.RUnlock()
return events
}
func (evl *eventListener) SetRemoved() {
evl.mtx.Lock()
evl.removed = true
evl.mtx.Unlock()
}

View File

@@ -8,8 +8,6 @@ import (
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
)
// TestAddListenerForEventFireOnce sets up an EventSwitch, subscribes a single
@@ -18,9 +16,7 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
evsw := NewEventSwitch()
messages := make(chan EventData)
require.NoError(t, evsw.AddListenerForEvent("listener", "event",
@@ -45,9 +41,7 @@ func TestAddListenerForEventFireMany(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
evsw := NewEventSwitch()
doneSum := make(chan uint64)
doneSending := make(chan uint64)
@@ -81,9 +75,7 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
evsw := NewEventSwitch()
doneSum := make(chan uint64)
doneSending1 := make(chan uint64)
@@ -143,8 +135,7 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
evsw := NewEventSwitch()
doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64)
@@ -235,9 +226,8 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
func TestManageListenersAsync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
evsw := NewEventSwitch()
doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64)

View File

@@ -12,11 +12,8 @@ var (
// errAlreadyStopped is returned when somebody tries to stop an already
// stopped service (without resetting it).
errAlreadyStopped = errors.New("already stopped")
)
var (
_ Service = (*BaseService)(nil)
_ Service = (*NopService)(nil)
)
// Service defines a service that can be started, stopped, and reset.
@@ -75,8 +72,7 @@ Typical usage:
}
func (fs *FooService) OnStop() {
// close/destroy private fields
// stop subroutines, etc.
// close/destroy private fields and releases resources
}
*/
type BaseService struct {
@@ -90,12 +86,6 @@ type BaseService struct {
impl Implementation
}
type NopService struct{}
func (NopService) Start(_ context.Context) error { return nil }
func (NopService) IsRunning() bool { return true }
func (NopService) Wait() {}
// NewBaseService creates a new BaseService.
func NewBaseService(logger log.Logger, name string, impl Implementation) *BaseService {
return &BaseService{
@@ -206,5 +196,5 @@ func (bs *BaseService) getWait() <-chan struct{} {
// Wait blocks until the service is stopped.
func (bs *BaseService) Wait() { <-bs.getWait() }
// String implements Service by returning a string representation of the service.
// String provides a human-friendly representation of the service.
func (bs *BaseService) String() string { return bs.name }

View File

@@ -98,8 +98,6 @@ var plan = transform.Plan{
},
{
// Since https://github.com/tendermint/tendermint/pull/6241.
//
// TODO(creachadair): backport into v0.35.x.
Desc: `Add top-level mode setting (default "full")`,
T: transform.EnsureKey(nil, &parser.KeyValue{
Block: parser.Comments{"Mode of Node: full | validator | seed"},
@@ -134,16 +132,12 @@ var plan = transform.Plan{
},
{
// Since https://github.com/tendermint/tendermint/pull/6396.
//
// TODO(creachadair): backport into v0.35.x.
Desc: "Remove vestigial mempool.wal-dir setting",
T: transform.Remove(parser.Key{"mempool", "wal-dir"}),
ErrorOK: true,
},
{
// Since https://github.com/tendermint/tendermint/pull/6323.
//
// TODO(creachadair): backport into v0.35.x.
Desc: "Add new [p2p] queue-type setting",
T: transform.EnsureKey(parser.Key{"p2p"}, &parser.KeyValue{
Block: parser.Comments{"Select the p2p internal queue"},
@@ -154,8 +148,6 @@ var plan = transform.Plan{
},
{
// Since https://github.com/tendermint/tendermint/pull/6353.
//
// TODO(creachadair): backport into v0.35.x.
Desc: "Add [p2p] connection count and rate limit settings",
T: transform.Func(func(_ context.Context, doc *tomledit.Document) error {
tab := transform.FindTable(doc, "p2p")
@@ -181,8 +173,6 @@ var plan = transform.Plan{
// Added "chunk-fetchers" https://github.com/tendermint/tendermint/pull/6566.
// This value was backported into v0.34.11 (modulo casing).
// Renamed to "fetchers" https://github.com/tendermint/tendermint/pull/6587.
//
// TODO(creachadair): backport into v0.35.x.
Desc: "Rename statesync.chunk-fetchers to statesync.fetchers",
T: transform.Func(func(ctx context.Context, doc *tomledit.Document) error {
// If the key already exists, rename it preserving its value.
@@ -204,8 +194,6 @@ var plan = transform.Plan{
{
// Since https://github.com/tendermint/tendermint/pull/6807.
// Backported into v0.34.13 (modulo casing).
//
// TODO(creachadair): backport into v0.35.x.
Desc: "Add statesync.use-p2p setting",
T: transform.EnsureKey(parser.Key{"statesync"}, &parser.KeyValue{
Block: parser.Comments{