From ad552b2bb1174237ca8c927a859b0560ec06a1c1 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 12 Jan 2021 12:50:49 +0100 Subject: [PATCH] evidence: buffer evidence from consensus (#5890) --- CHANGELOG_PENDING.md | 3 ++- evidence/pool.go | 63 ++++++++++++++++++++++++++++++------------- evidence/pool_test.go | 31 ++++++++++++++++----- 3 files changed, 70 insertions(+), 27 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 13b74e0e1..171050ea7 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -25,4 +25,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES - +- [evidence] \#5890 Add a buffer to evidence from consensus to avoid broadcasting and proposing evidence before the +height of such an evidence has finished (@cmwaters) diff --git a/evidence/pool.go b/evidence/pool.go index 77dbf1a39..f57490290 100644 --- a/evidence/pool.go +++ b/evidence/pool.go @@ -41,6 +41,10 @@ type Pool struct { mtx sync.Mutex // latest state state sm.State + // evidence from consensus if buffered to this slice, awaiting until the next height + // before being flushed to the pool. This prevents broadcasting and proposing of + // evidence before the height with which the evidence happened is finished. + consensusBuffer []types.Evidence pruningHeight int64 pruningTime time.Time @@ -56,12 +60,13 @@ func NewPool(evidenceDB dbm.DB, stateDB sm.Store, blockStore BlockStore) (*Pool, } pool := &Pool{ - stateDB: stateDB, - blockStore: blockStore, - state: state, - logger: log.NewNopLogger(), - evidenceStore: evidenceDB, - evidenceList: clist.New(), + stateDB: stateDB, + blockStore: blockStore, + state: state, + logger: log.NewNopLogger(), + evidenceStore: evidenceDB, + evidenceList: clist.New(), + consensusBuffer: make([]types.Evidence, 0), } // if pending evidence already in db, in event of prior failure, then check for expiration, @@ -104,9 +109,20 @@ func (evpool *Pool) Update(state sm.State, ev types.EvidenceList) { evpool.logger.Info("Updating evidence pool", "last_block_height", state.LastBlockHeight, "last_block_time", state.LastBlockTime) - // update the state - evpool.updateState(state) + evpool.logger.Info( + "updating evidence pool", + "last_block_height", state.LastBlockHeight, + "last_block_time", state.LastBlockTime, + ) + evpool.mtx.Lock() + // flush awaiting evidence from consensus into pool + evpool.flushConsensusBuffer() + // update state + evpool.state = state + evpool.mtx.Unlock() + + // move committed evidence out from the pending pool and into the committed pool evpool.markEvidenceAsCommitted(ev) // prune pending evidence when it has expired. This also updates when the next evidence will expire @@ -163,13 +179,13 @@ func (evpool *Pool) AddEvidenceFromConsensus(ev types.Evidence) error { return nil } - if err := evpool.addPendingEvidence(ev); err != nil { - return fmt.Errorf("can't add evidence to pending list: %w", err) - } - // add evidence to be gossiped with peers - evpool.evidenceList.PushBack(ev) - - evpool.logger.Info("Verified new evidence of byzantine behavior", "evidence", ev) + // add evidence to a buffer which will pass the evidence to the pool at the following height. + // This avoids the issue of some nodes verifying and proposing evidence at a height where the + // block hasn't been committed on cause others to potentially fail. + evpool.mtx.Lock() + defer evpool.mtx.Unlock() + evpool.consensusBuffer = append(evpool.consensusBuffer, ev) + evpool.logger.Info("received new evidence of byzantine behavior from consensus", "evidence", ev) return nil } @@ -491,10 +507,19 @@ func (evpool *Pool) removeEvidenceFromList( } } -func (evpool *Pool) updateState(state sm.State) { - evpool.mtx.Lock() - defer evpool.mtx.Unlock() - evpool.state = state +// flushConsensusBuffer moves the evidence produced from consensus into the evidence pool +// and list so that it can be broadcasted and proposed +func (evpool *Pool) flushConsensusBuffer() { + for _, ev := range evpool.consensusBuffer { + if err := evpool.addPendingEvidence(ev); err != nil { + evpool.logger.Error("failed to flush evidence from consensus buffer to pending list: %w", err) + continue + } + + evpool.evidenceList.PushBack(ev) + } + // reset consensus buffer + evpool.consensusBuffer = make([]types.Evidence, 0) } func bytesToEv(evBytes []byte) (types.Evidence, error) { diff --git a/evidence/pool_test.go b/evidence/pool_test.go index 046f4efc5..7d842003c 100644 --- a/evidence/pool_test.go +++ b/evidence/pool_test.go @@ -148,16 +148,33 @@ func TestAddEvidenceFromConsensus(t *testing.T) { var height int64 = 10 pool, val := defaultTestPool(height) ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime, val, evidenceChainID) - err := pool.AddEvidenceFromConsensus(ev) - assert.NoError(t, err) + + require.NoError(t, pool.AddEvidenceFromConsensus(ev)) + + // evidence from consensus should not be added immediately but reside in the consensus buffer + evList, evSize := pool.PendingEvidence(defaultEvidenceMaxBytes) + require.Empty(t, evList) + require.Zero(t, evSize) + next := pool.EvidenceFront() - assert.Equal(t, ev, next.Value.(types.Evidence)) + require.Nil(t, next) + + // move to next height and update state and evidence pool + state := pool.State() + state.LastBlockHeight++ + pool.Update(state, []types.Evidence{}) + + // should be able to retrieve evidence from pool + evList, _ = pool.PendingEvidence(defaultEvidenceMaxBytes) + require.Equal(t, []types.Evidence{ev}, evList) // shouldn't be able to submit the same evidence twice - err = pool.AddEvidenceFromConsensus(ev) - assert.NoError(t, err) - evs, _ := pool.PendingEvidence(defaultEvidenceMaxBytes) - assert.Equal(t, 1, len(evs)) + require.NoError(t, pool.AddEvidenceFromConsensus(ev)) + state = pool.State() + state.LastBlockHeight++ + pool.Update(state, []types.Evidence{}) + evList2, _ := pool.PendingEvidence(defaultEvidenceMaxBytes) + require.Equal(t, evList, evList2) } func TestEvidencePoolUpdate(t *testing.T) {