mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 13:55:17 +00:00
improve evidence db #4722
merged the existing store into pool, consolidated the three buckets into two, used block height as a marked for committed evidence, evidence list recovers on start up, improved error handling
This commit is contained in:
@@ -1,21 +0,0 @@
|
||||
package evidence
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// ErrInvalidEvidence returns when evidence failed to validate
|
||||
type ErrInvalidEvidence struct {
|
||||
Reason error
|
||||
}
|
||||
|
||||
func (e ErrInvalidEvidence) Error() string {
|
||||
return fmt.Sprintf("evidence is not valid: %v ", e.Reason)
|
||||
}
|
||||
|
||||
// ErrEvidenceAlreadyStored indicates that the evidence has already been stored in the evidence db
|
||||
type ErrEvidenceAlreadyStored struct{}
|
||||
|
||||
func (e ErrEvidenceAlreadyStored) Error() string {
|
||||
return "evidence is already stored"
|
||||
}
|
||||
238
evidence/pool.go
238
evidence/pool.go
@@ -14,12 +14,17 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Pool maintains a pool of valid evidence in an Store.
|
||||
const (
|
||||
baseKeyCommitted = byte(0x00) // committed evidence
|
||||
baseKeyPending = byte(0x01) // pending evidence
|
||||
)
|
||||
|
||||
// Pool maintains a pool of valid evidence to be broadcasted and committed
|
||||
type Pool struct {
|
||||
logger log.Logger
|
||||
|
||||
store *Store
|
||||
evidenceList *clist.CList // concurrent linked-list of evidence
|
||||
evidenceStore dbm.DB
|
||||
evidenceList *clist.CList // concurrent linked-list of evidence
|
||||
|
||||
// needed to load validators to verify evidence
|
||||
stateDB dbm.DB
|
||||
@@ -41,7 +46,6 @@ type valToLastHeightMap map[string]int64
|
||||
|
||||
func NewPool(stateDB, evidenceDB dbm.DB, blockStore *store.BlockStore) (*Pool, error) {
|
||||
var (
|
||||
store = NewStore(evidenceDB)
|
||||
state = sm.LoadState(stateDB)
|
||||
)
|
||||
|
||||
@@ -50,49 +54,44 @@ func NewPool(stateDB, evidenceDB dbm.DB, blockStore *store.BlockStore) (*Pool, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Pool{
|
||||
pool := &Pool{
|
||||
stateDB: stateDB,
|
||||
blockStore: blockStore,
|
||||
state: state,
|
||||
logger: log.NewNopLogger(),
|
||||
store: store,
|
||||
evidenceStore: evidenceDB,
|
||||
evidenceList: clist.New(),
|
||||
valToLastHeight: valToLastHeight,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// if pending evidence already in db, in event of prior failure, then load it back to the evidenceList
|
||||
evList, err := pool.listEvidence(baseKeyPending, -1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, ev := range evList {
|
||||
if pool.IsExpired(ev) {
|
||||
pool.removePendingEvidence(ev)
|
||||
continue
|
||||
}
|
||||
pool.evidenceList.PushBack(ev)
|
||||
}
|
||||
|
||||
return pool, nil
|
||||
}
|
||||
|
||||
func (evpool *Pool) EvidenceFront() *clist.CElement {
|
||||
return evpool.evidenceList.Front()
|
||||
}
|
||||
|
||||
func (evpool *Pool) EvidenceWaitChan() <-chan struct{} {
|
||||
return evpool.evidenceList.WaitChan()
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger.
|
||||
func (evpool *Pool) SetLogger(l log.Logger) {
|
||||
evpool.logger = l
|
||||
}
|
||||
|
||||
// PriorityEvidence returns the priority evidence.
|
||||
func (evpool *Pool) PriorityEvidence() []types.Evidence {
|
||||
return evpool.store.PriorityEvidence()
|
||||
}
|
||||
|
||||
// PendingEvidence returns up to maxNum uncommitted evidence.
|
||||
// If maxNum is -1, all evidence is returned.
|
||||
// PendingEvidence is used primarily as part of block proposal and returns up to maxNum of uncommitted evidence.
|
||||
// If maxNum is -1, all evidence is returned. Pending evidence is prioritised based on time.
|
||||
func (evpool *Pool) PendingEvidence(maxNum int64) []types.Evidence {
|
||||
return evpool.store.PendingEvidence(maxNum)
|
||||
evidence, err := evpool.listEvidence(baseKeyPending, maxNum)
|
||||
if err != nil {
|
||||
evpool.logger.Error("Unable to retrieve pending evidence", "err", err)
|
||||
}
|
||||
return evidence
|
||||
}
|
||||
|
||||
// State returns the current state of the evpool.
|
||||
func (evpool *Pool) State() sm.State {
|
||||
evpool.mtx.Lock()
|
||||
defer evpool.mtx.Unlock()
|
||||
return evpool.state
|
||||
}
|
||||
|
||||
// Update loads the latest
|
||||
// Update uses the latest block to update the state, the ValToLastHeight map for evidence expiration
|
||||
// and to mark committed evidence
|
||||
func (evpool *Pool) Update(block *types.Block, state sm.State) {
|
||||
// sanity check
|
||||
if state.LastBlockHeight != block.Height {
|
||||
@@ -146,8 +145,8 @@ func (evpool *Pool) AddEvidence(evidence types.Evidence) error {
|
||||
}
|
||||
|
||||
for _, ev := range evList {
|
||||
if evpool.store.Has(evidence) {
|
||||
return ErrEvidenceAlreadyStored{}
|
||||
if evpool.Has(ev) {
|
||||
continue
|
||||
}
|
||||
|
||||
// For lunatic validator evidence, a header needs to be fetched.
|
||||
@@ -165,17 +164,12 @@ func (evpool *Pool) AddEvidence(evidence types.Evidence) error {
|
||||
return fmt.Errorf("failed to verify %v: %w", ev, err)
|
||||
}
|
||||
|
||||
// 2) Compute priority.
|
||||
_, val := valSet.GetByAddress(ev.Address())
|
||||
priority := val.VotingPower
|
||||
|
||||
// 3) Save to store.
|
||||
_, err := evpool.store.AddNewEvidence(ev, priority)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add new evidence %v: %w", ev, err)
|
||||
// 2) Save to store.
|
||||
if err := evpool.addPendingEvidence(ev); err != nil {
|
||||
return fmt.Errorf("database error: %v", err)
|
||||
}
|
||||
|
||||
// 4) Add evidence to clist.
|
||||
// 3) Add evidence to clist.
|
||||
evpool.evidenceList.PushBack(ev)
|
||||
|
||||
evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", ev)
|
||||
@@ -190,20 +184,77 @@ func (evpool *Pool) MarkEvidenceAsCommitted(height int64, lastBlockTime time.Tim
|
||||
// make a map of committed evidence to remove from the clist
|
||||
blockEvidenceMap := make(map[string]struct{})
|
||||
for _, ev := range evidence {
|
||||
evpool.store.MarkEvidenceAsCommitted(ev)
|
||||
blockEvidenceMap[evMapKey(ev)] = struct{}{}
|
||||
// As the evidence is stored in the block store we only need to record the height that it was saved at.
|
||||
key := keyCommitted(ev)
|
||||
evBytes := cdc.MustMarshalBinaryBare(height)
|
||||
if err := evpool.evidenceStore.Set(key, evBytes); err != nil {
|
||||
evpool.logger.Error("Unable to add committed evidence", "err", err)
|
||||
// if we can't move evidence to committed then don't remove the evidence from pending
|
||||
continue
|
||||
}
|
||||
// if pending, remove from that bucket, remember not all evidence has been seen before
|
||||
if evpool.IsPending(ev) {
|
||||
evpool.removePendingEvidence(ev)
|
||||
blockEvidenceMap[evMapKey(ev)] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// remove committed evidence from the clist
|
||||
evidenceParams := evpool.State().ConsensusParams.Evidence
|
||||
evpool.removeEvidence(height, lastBlockTime, evidenceParams, blockEvidenceMap)
|
||||
if len(blockEvidenceMap) != 0 {
|
||||
evidenceParams := evpool.State().ConsensusParams.Evidence
|
||||
evpool.removeEvidenceFromList(height, lastBlockTime, evidenceParams, blockEvidenceMap)
|
||||
}
|
||||
}
|
||||
|
||||
// IsCommitted returns true if we have already seen this exact evidence and it
|
||||
// is already marked as committed.
|
||||
// Has checks whether the evidence exists either pending or already committed
|
||||
func (evpool *Pool) Has(evidence types.Evidence) bool {
|
||||
return evpool.IsPending(evidence) || evpool.IsCommitted(evidence)
|
||||
}
|
||||
|
||||
// IsExpired checks whether evidence is past the maximum age where it can be used
|
||||
func (evpool *Pool) IsExpired(evidence types.Evidence) bool {
|
||||
var (
|
||||
params = evpool.State().ConsensusParams.Evidence
|
||||
ageDuration = evpool.State().LastBlockTime.Sub(evidence.Time())
|
||||
ageNumBlocks = evpool.State().LastBlockHeight - evidence.Height()
|
||||
)
|
||||
return ageNumBlocks > params.MaxAgeNumBlocks &&
|
||||
ageDuration > params.MaxAgeDuration
|
||||
}
|
||||
|
||||
// IsCommitted returns true if we have already seen this exact evidence and it is already marked as committed.
|
||||
func (evpool *Pool) IsCommitted(evidence types.Evidence) bool {
|
||||
ei := evpool.store.getInfo(evidence)
|
||||
return ei.Evidence != nil && ei.Committed
|
||||
key := keyCommitted(evidence)
|
||||
ok, err := evpool.evidenceStore.Has(key)
|
||||
if err != nil {
|
||||
evpool.logger.Error("Unable to find committed evidence", "err", err)
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// Checks whether the evidence is already pending. DB errors are passed to the logger.
|
||||
func (evpool *Pool) IsPending(evidence types.Evidence) bool {
|
||||
key := keyPending(evidence)
|
||||
ok, err := evpool.evidenceStore.Has(key)
|
||||
if err != nil {
|
||||
evpool.logger.Error("Unable to find pending evidence", "err", err)
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
// EvidenceFront goes to the first evidence in the clist
|
||||
func (evpool *Pool) EvidenceFront() *clist.CElement {
|
||||
return evpool.evidenceList.Front()
|
||||
}
|
||||
|
||||
// EvidenceWaitChan is a channel that closes once the first evidence in the list is there. i.e Front is not nil
|
||||
func (evpool *Pool) EvidenceWaitChan() <-chan struct{} {
|
||||
return evpool.evidenceList.WaitChan()
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger.
|
||||
func (evpool *Pool) SetLogger(l log.Logger) {
|
||||
evpool.logger = l
|
||||
}
|
||||
|
||||
// ValidatorLastHeight returns the last height of the validator w/ the
|
||||
@@ -218,7 +269,56 @@ func (evpool *Pool) ValidatorLastHeight(address []byte) int64 {
|
||||
return h
|
||||
}
|
||||
|
||||
func (evpool *Pool) removeEvidence(
|
||||
// State returns the current state of the evpool.
|
||||
func (evpool *Pool) State() sm.State {
|
||||
evpool.mtx.Lock()
|
||||
defer evpool.mtx.Unlock()
|
||||
return evpool.state
|
||||
}
|
||||
|
||||
func (evpool *Pool) addPendingEvidence(evidence types.Evidence) error {
|
||||
evBytes := cdc.MustMarshalBinaryBare(evidence)
|
||||
key := keyPending(evidence)
|
||||
return evpool.evidenceStore.Set(key, evBytes)
|
||||
}
|
||||
|
||||
func (evpool *Pool) removePendingEvidence(evidence types.Evidence) {
|
||||
key := keyPending(evidence)
|
||||
if err := evpool.evidenceStore.Delete(key); err != nil {
|
||||
evpool.logger.Error("Unable to delete pending evidence", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// listEvidence lists up to maxNum pieces of evidence for the given prefix key.
|
||||
// It is wrapped by PriorityEvidence and PendingEvidence for convenience.
|
||||
// If maxNum is -1, there's no cap on the size of returned evidence.
|
||||
func (evpool *Pool) listEvidence(prefixKey byte, maxNum int64) ([]types.Evidence, error) {
|
||||
var count int64
|
||||
var evidence []types.Evidence
|
||||
iter, err := dbm.IteratePrefix(evpool.evidenceStore, []byte{prefixKey})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("database error: %v", err)
|
||||
}
|
||||
defer iter.Close()
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
val := iter.Value()
|
||||
|
||||
if count == maxNum {
|
||||
return evidence, nil
|
||||
}
|
||||
count++
|
||||
|
||||
var ev types.Evidence
|
||||
err := cdc.UnmarshalBinaryBare(val, &ev)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evidence = append(evidence, ev)
|
||||
}
|
||||
return evidence, nil
|
||||
}
|
||||
|
||||
func (evpool *Pool) removeEvidenceFromList(
|
||||
height int64,
|
||||
lastBlockTime time.Time,
|
||||
params types.EvidenceParams,
|
||||
@@ -324,3 +424,29 @@ func buildValToLastHeightMap(state sm.State, stateDB dbm.DB, blockStore *store.B
|
||||
|
||||
return valToLastHeight, nil
|
||||
}
|
||||
|
||||
// big endian padded hex
|
||||
func bE(h int64) string {
|
||||
return fmt.Sprintf("%0.16X", h)
|
||||
}
|
||||
|
||||
func keyCommitted(evidence types.Evidence) []byte {
|
||||
return append([]byte{baseKeyCommitted}, keySuffix(evidence)...)
|
||||
}
|
||||
|
||||
func keyPending(evidence types.Evidence) []byte {
|
||||
return append([]byte{baseKeyPending}, keySuffix(evidence)...)
|
||||
}
|
||||
|
||||
func keySuffix(evidence types.Evidence) []byte {
|
||||
return []byte(fmt.Sprintf("%s/%X", bE(evidence.Height()), evidence.Hash()))
|
||||
}
|
||||
|
||||
// ErrInvalidEvidence returns when evidence failed to validate
|
||||
type ErrInvalidEvidence struct {
|
||||
Reason error
|
||||
}
|
||||
|
||||
func (e ErrInvalidEvidence) Error() string {
|
||||
return fmt.Sprintf("evidence is not valid: %v ", e.Reason)
|
||||
}
|
||||
|
||||
@@ -45,6 +45,8 @@ func TestEvidencePool(t *testing.T) {
|
||||
if assert.Error(t, err) {
|
||||
assert.Contains(t, err.Error(), "is too old; min height is 32 and evidence can not be older than")
|
||||
}
|
||||
assert.False(t, pool.IsPending(badEvidence))
|
||||
assert.True(t, pool.IsExpired(badEvidence))
|
||||
|
||||
// good evidence
|
||||
evAdded := make(chan struct{})
|
||||
@@ -59,18 +61,18 @@ func TestEvidencePool(t *testing.T) {
|
||||
select {
|
||||
case <-evAdded:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("evidence was not added after 5s")
|
||||
t.Fatal("evidence was not added to list after 5s")
|
||||
}
|
||||
|
||||
assert.Equal(t, 1, pool.evidenceList.Len())
|
||||
|
||||
// if we send it again, it shouldnt add and return an error
|
||||
err = pool.AddEvidence(goodEvidence)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, pool.evidenceList.Len())
|
||||
}
|
||||
|
||||
func TestEvidencePoolIsCommitted(t *testing.T) {
|
||||
func TestProposingAndCommittingEvidence(t *testing.T) {
|
||||
var (
|
||||
valAddr = []byte("validator_address")
|
||||
height = int64(1)
|
||||
@@ -79,22 +81,31 @@ func TestEvidencePoolIsCommitted(t *testing.T) {
|
||||
evidenceDB = dbm.NewMemDB()
|
||||
blockStoreDB = dbm.NewMemDB()
|
||||
blockStore = initializeBlockStore(blockStoreDB, sm.LoadState(stateDB), valAddr)
|
||||
evidenceTime = time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
)
|
||||
|
||||
pool, err := NewPool(stateDB, evidenceDB, blockStore)
|
||||
require.NoError(t, err)
|
||||
|
||||
// evidence not seen yet:
|
||||
evidence := types.NewMockEvidence(height, time.Now(), valAddr)
|
||||
evidence := types.NewMockEvidence(height, evidenceTime, valAddr)
|
||||
assert.False(t, pool.IsCommitted(evidence))
|
||||
|
||||
// evidence seen but not yet committed:
|
||||
assert.NoError(t, pool.AddEvidence(evidence))
|
||||
assert.False(t, pool.IsCommitted(evidence))
|
||||
|
||||
// test evidence is proposed
|
||||
proposedEvidence := pool.PendingEvidence(-1)
|
||||
assert.Equal(t, proposedEvidence[0], evidence)
|
||||
|
||||
// evidence seen and committed:
|
||||
pool.MarkEvidenceAsCommitted(height, lastBlockTime, []types.Evidence{evidence})
|
||||
pool.MarkEvidenceAsCommitted(height, lastBlockTime, proposedEvidence)
|
||||
assert.True(t, pool.IsCommitted(evidence))
|
||||
assert.False(t, pool.IsPending(evidence))
|
||||
assert.Equal(t, 0, pool.evidenceList.Len())
|
||||
|
||||
// evidence should
|
||||
}
|
||||
|
||||
func TestEvidencePoolAddEvidence(t *testing.T) {
|
||||
@@ -184,6 +195,35 @@ func TestEvidencePoolNewPool(t *testing.T) {
|
||||
assert.EqualValues(t, 0, pool.ValidatorLastHeight([]byte("non-existent-validator")))
|
||||
}
|
||||
|
||||
func TestRecoverPendingEvidence(t *testing.T) {
|
||||
var (
|
||||
valAddr = []byte("val1")
|
||||
height = int64(30)
|
||||
stateDB = initializeValidatorState(valAddr, height)
|
||||
evidenceDB = dbm.NewMemDB()
|
||||
blockStoreDB = dbm.NewMemDB()
|
||||
state = sm.LoadState(stateDB)
|
||||
blockStore = initializeBlockStore(blockStoreDB, state, valAddr)
|
||||
evidenceTime = time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
goodEvidence = types.NewMockEvidence(height, time.Now(), valAddr)
|
||||
expiredEvidence = types.NewMockEvidence(int64(1), evidenceTime, valAddr)
|
||||
)
|
||||
|
||||
// load good evidence
|
||||
goodKey := keyPending(goodEvidence)
|
||||
goodEvidenceBytes := cdc.MustMarshalBinaryBare(goodEvidence)
|
||||
_ = evidenceDB.Set(goodKey, goodEvidenceBytes)
|
||||
|
||||
// load expired evidence
|
||||
expiredKey := keyPending(expiredEvidence)
|
||||
expiredEvidenceBytes := cdc.MustMarshalBinaryBare(expiredEvidence)
|
||||
_ = evidenceDB.Set(expiredKey, expiredEvidenceBytes)
|
||||
pool, err := NewPool(stateDB, evidenceDB, blockStore)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, pool.evidenceList.Len())
|
||||
assert.True(t, pool.IsPending(goodEvidence))
|
||||
}
|
||||
|
||||
func initializeValidatorState(valAddr []byte, height int64) dbm.DB {
|
||||
stateDB := dbm.NewMemDB()
|
||||
|
||||
|
||||
@@ -88,8 +88,6 @@ func (evR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
// punish peer
|
||||
evR.Switch.StopPeerForError(src, err)
|
||||
return
|
||||
case ErrEvidenceAlreadyStored:
|
||||
evR.Logger.Debug("Evidence already exists", "evidence", msg.Evidence)
|
||||
case nil:
|
||||
default:
|
||||
evR.Logger.Error("Evidence has not been added", "evidence", msg.Evidence, "err", err)
|
||||
|
||||
@@ -1,222 +0,0 @@
|
||||
package evidence
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
/*
|
||||
Requirements:
|
||||
- Valid new evidence must be persisted immediately and never forgotten
|
||||
- Uncommitted evidence must be continuously broadcast
|
||||
- Uncommitted evidence has a partial order, the evidence's priority
|
||||
|
||||
Impl:
|
||||
- First commit atomically in outqueue, pending, lookup.
|
||||
- Once broadcast, remove from outqueue. No need to sync
|
||||
- Once committed, atomically remove from pending and update lookup.
|
||||
|
||||
Schema for indexing evidence (note you need both height and hash to find a piece of evidence):
|
||||
|
||||
"evidence-lookup"/<evidence-height>/<evidence-hash> -> Info
|
||||
"evidence-outqueue"/<priority>/<evidence-height>/<evidence-hash> -> Info
|
||||
"evidence-pending"/<evidence-height>/<evidence-hash> -> Info
|
||||
*/
|
||||
|
||||
type Info struct {
|
||||
Committed bool
|
||||
Priority int64
|
||||
Evidence types.Evidence
|
||||
}
|
||||
|
||||
const (
|
||||
baseKeyLookup = "evidence-lookup" // all evidence
|
||||
baseKeyOutqueue = "evidence-outqueue" // not-yet broadcast
|
||||
baseKeyPending = "evidence-pending" // broadcast but not committed
|
||||
)
|
||||
|
||||
func keyLookup(evidence types.Evidence) []byte {
|
||||
return keyLookupFromHeightAndHash(evidence.Height(), evidence.Hash())
|
||||
}
|
||||
|
||||
// big endian padded hex
|
||||
func bE(h int64) string {
|
||||
return fmt.Sprintf("%0.16X", h)
|
||||
}
|
||||
|
||||
func keyLookupFromHeightAndHash(height int64, hash []byte) []byte {
|
||||
return _key("%s/%s/%X", baseKeyLookup, bE(height), hash)
|
||||
}
|
||||
|
||||
func keyOutqueue(evidence types.Evidence, priority int64) []byte {
|
||||
return _key("%s/%s/%s/%X", baseKeyOutqueue, bE(priority), bE(evidence.Height()), evidence.Hash())
|
||||
}
|
||||
|
||||
func keyPending(evidence types.Evidence) []byte {
|
||||
return _key("%s/%s/%X", baseKeyPending, bE(evidence.Height()), evidence.Hash())
|
||||
}
|
||||
|
||||
func _key(format string, o ...interface{}) []byte {
|
||||
return []byte(fmt.Sprintf(format, o...))
|
||||
}
|
||||
|
||||
// Store is a store of all the evidence we've seen, including
|
||||
// evidence that has been committed, evidence that has been verified but not broadcast,
|
||||
// and evidence that has been broadcast but not yet committed.
|
||||
type Store struct {
|
||||
db dbm.DB
|
||||
}
|
||||
|
||||
func NewStore(db dbm.DB) *Store {
|
||||
return &Store{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// PriorityEvidence returns the evidence from the outqueue, sorted by highest priority.
|
||||
func (store *Store) PriorityEvidence() (evidence []types.Evidence) {
|
||||
// reverse the order so highest priority is first
|
||||
l := store.listEvidence(baseKeyOutqueue, -1)
|
||||
for i, j := 0, len(l)-1; i < j; i, j = i+1, j-1 {
|
||||
l[i], l[j] = l[j], l[i]
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
// PendingEvidence returns up to maxNum known, uncommitted evidence.
|
||||
// If maxNum is -1, all evidence is returned.
|
||||
func (store *Store) PendingEvidence(maxNum int64) (evidence []types.Evidence) {
|
||||
return store.listEvidence(baseKeyPending, maxNum)
|
||||
}
|
||||
|
||||
// listEvidence lists up to maxNum pieces of evidence for the given prefix key.
|
||||
// It is wrapped by PriorityEvidence and PendingEvidence for convenience.
|
||||
// If maxNum is -1, there's no cap on the size of returned evidence.
|
||||
func (store *Store) listEvidence(prefixKey string, maxNum int64) (evidence []types.Evidence) {
|
||||
var count int64
|
||||
iter, err := dbm.IteratePrefix(store.db, []byte(prefixKey))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer iter.Close()
|
||||
for ; iter.Valid(); iter.Next() {
|
||||
val := iter.Value()
|
||||
|
||||
if count == maxNum {
|
||||
return evidence
|
||||
}
|
||||
count++
|
||||
|
||||
var ei Info
|
||||
err := cdc.UnmarshalBinaryBare(val, &ei)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
evidence = append(evidence, ei.Evidence)
|
||||
}
|
||||
return evidence
|
||||
}
|
||||
|
||||
// GetInfo fetches the Info with the given height and hash.
|
||||
// If not found, ei.Evidence is nil.
|
||||
func (store *Store) GetInfo(height int64, hash []byte) Info {
|
||||
key := keyLookupFromHeightAndHash(height, hash)
|
||||
val, err := store.db.Get(key)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(val) == 0 {
|
||||
return Info{}
|
||||
}
|
||||
var ei Info
|
||||
err = cdc.UnmarshalBinaryBare(val, &ei)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ei
|
||||
}
|
||||
|
||||
// Has checks if the evidence is already stored
|
||||
func (store *Store) Has(evidence types.Evidence) bool {
|
||||
key := keyLookup(evidence)
|
||||
ok, _ := store.db.Has(key)
|
||||
return ok
|
||||
}
|
||||
|
||||
// AddNewEvidence adds the given evidence to the database.
|
||||
// It returns false if the evidence is already stored.
|
||||
func (store *Store) AddNewEvidence(evidence types.Evidence, priority int64) (bool, error) {
|
||||
// check if we already have seen it
|
||||
if store.Has(evidence) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
ei := Info{
|
||||
Committed: false,
|
||||
Priority: priority,
|
||||
Evidence: evidence,
|
||||
}
|
||||
eiBytes := cdc.MustMarshalBinaryBare(ei)
|
||||
|
||||
// add it to the store
|
||||
var err error
|
||||
key := keyOutqueue(evidence, priority)
|
||||
if err = store.db.Set(key, eiBytes); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
key = keyPending(evidence)
|
||||
if err = store.db.Set(key, eiBytes); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
key = keyLookup(evidence)
|
||||
if err = store.db.SetSync(key, eiBytes); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// MarkEvidenceAsBroadcasted removes evidence from Outqueue.
|
||||
func (store *Store) MarkEvidenceAsBroadcasted(evidence types.Evidence) {
|
||||
ei := store.getInfo(evidence)
|
||||
if ei.Evidence == nil {
|
||||
// nothing to do; we did not store the evidence yet (AddNewEvidence):
|
||||
return
|
||||
}
|
||||
// remove from the outqueue
|
||||
key := keyOutqueue(evidence, ei.Priority)
|
||||
store.db.Delete(key)
|
||||
}
|
||||
|
||||
// MarkEvidenceAsCommitted removes evidence from pending and outqueue and sets the state to committed.
|
||||
func (store *Store) MarkEvidenceAsCommitted(evidence types.Evidence) {
|
||||
// if its committed, its been broadcast
|
||||
store.MarkEvidenceAsBroadcasted(evidence)
|
||||
|
||||
pendingKey := keyPending(evidence)
|
||||
store.db.Delete(pendingKey)
|
||||
|
||||
// committed Info doens't need priority
|
||||
ei := Info{
|
||||
Committed: true,
|
||||
Evidence: evidence,
|
||||
Priority: 0,
|
||||
}
|
||||
|
||||
lookupKey := keyLookup(evidence)
|
||||
store.db.SetSync(lookupKey, cdc.MustMarshalBinaryBare(ei))
|
||||
}
|
||||
|
||||
//---------------------------------------------------
|
||||
// utils
|
||||
|
||||
// getInfo is convenience for calling GetInfo if we have the full evidence.
|
||||
func (store *Store) getInfo(evidence types.Evidence) Info {
|
||||
return store.GetInfo(evidence.Height(), evidence.Hash())
|
||||
}
|
||||
@@ -1,126 +0,0 @@
|
||||
package evidence
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
//-------------------------------------------
|
||||
|
||||
func TestStoreAddDuplicate(t *testing.T) {
|
||||
db := dbm.NewMemDB()
|
||||
store := NewStore(db)
|
||||
|
||||
priority := int64(10)
|
||||
ev := types.NewMockEvidence(2, time.Now().UTC(), []byte("val1"))
|
||||
|
||||
added, err := store.AddNewEvidence(ev, priority)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, added)
|
||||
|
||||
// cant add twice
|
||||
added, err = store.AddNewEvidence(ev, priority)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, added)
|
||||
}
|
||||
|
||||
func TestStoreCommitDuplicate(t *testing.T) {
|
||||
db := dbm.NewMemDB()
|
||||
store := NewStore(db)
|
||||
|
||||
priority := int64(10)
|
||||
ev := types.NewMockEvidence(2, time.Now().UTC(), []byte("val1"))
|
||||
|
||||
store.MarkEvidenceAsCommitted(ev)
|
||||
|
||||
added, err := store.AddNewEvidence(ev, priority)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, added)
|
||||
}
|
||||
|
||||
func TestStoreMark(t *testing.T) {
|
||||
db := dbm.NewMemDB()
|
||||
store := NewStore(db)
|
||||
|
||||
// before we do anything, priority/pending are empty
|
||||
priorityEv := store.PriorityEvidence()
|
||||
pendingEv := store.PendingEvidence(-1)
|
||||
assert.Equal(t, 0, len(priorityEv))
|
||||
assert.Equal(t, 0, len(pendingEv))
|
||||
|
||||
priority := int64(10)
|
||||
ev := types.NewMockEvidence(2, time.Now().UTC(), []byte("val1"))
|
||||
|
||||
added, err := store.AddNewEvidence(ev, priority)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, added)
|
||||
|
||||
// get the evidence. verify. should be uncommitted
|
||||
ei := store.GetInfo(ev.Height(), ev.Hash())
|
||||
assert.Equal(t, ev, ei.Evidence)
|
||||
assert.Equal(t, priority, ei.Priority)
|
||||
assert.False(t, ei.Committed)
|
||||
|
||||
// new evidence should be returns in priority/pending
|
||||
priorityEv = store.PriorityEvidence()
|
||||
pendingEv = store.PendingEvidence(-1)
|
||||
assert.Equal(t, 1, len(priorityEv))
|
||||
assert.Equal(t, 1, len(pendingEv))
|
||||
|
||||
// priority is now empty
|
||||
store.MarkEvidenceAsBroadcasted(ev)
|
||||
priorityEv = store.PriorityEvidence()
|
||||
pendingEv = store.PendingEvidence(-1)
|
||||
assert.Equal(t, 0, len(priorityEv))
|
||||
assert.Equal(t, 1, len(pendingEv))
|
||||
|
||||
// priority and pending are now empty
|
||||
store.MarkEvidenceAsCommitted(ev)
|
||||
priorityEv = store.PriorityEvidence()
|
||||
pendingEv = store.PendingEvidence(-1)
|
||||
assert.Equal(t, 0, len(priorityEv))
|
||||
assert.Equal(t, 0, len(pendingEv))
|
||||
|
||||
// evidence should show committed
|
||||
newPriority := int64(0)
|
||||
ei = store.GetInfo(ev.Height(), ev.Hash())
|
||||
assert.Equal(t, ev, ei.Evidence)
|
||||
assert.Equal(t, newPriority, ei.Priority)
|
||||
assert.True(t, ei.Committed)
|
||||
}
|
||||
|
||||
func TestStorePriority(t *testing.T) {
|
||||
db := dbm.NewMemDB()
|
||||
store := NewStore(db)
|
||||
|
||||
// sorted by priority and then height
|
||||
cases := []struct {
|
||||
ev types.MockEvidence
|
||||
priority int64
|
||||
}{
|
||||
{types.NewMockEvidence(2, time.Now().UTC(), []byte("val1")), 17},
|
||||
{types.NewMockEvidence(5, time.Now().UTC(), []byte("val2")), 15},
|
||||
{types.NewMockEvidence(10, time.Now().UTC(), []byte("val2")), 13},
|
||||
{types.NewMockEvidence(100, time.Now().UTC(), []byte("val2")), 11},
|
||||
{types.NewMockEvidence(90, time.Now().UTC(), []byte("val2")), 11},
|
||||
{types.NewMockEvidence(80, time.Now().UTC(), []byte("val2")), 11},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
added, err := store.AddNewEvidence(c.ev, c.priority)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, added)
|
||||
}
|
||||
|
||||
evList := store.PriorityEvidence()
|
||||
for i, ev := range evList {
|
||||
assert.Equal(t, ev, cases[i].ev)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user