rpc, node, mempool and start of ight client

This commit is contained in:
Marko Baricevic
2021-02-02 14:42:20 +01:00
parent b2f3e767a7
commit 7e7e962d93
26 changed files with 73 additions and 77 deletions

View File

@@ -23,10 +23,10 @@ const (
)
type blockStore interface {
LoadBlock(height int64) *types.Block
LoadBlock(height uint64) *types.Block
SaveBlock(*types.Block, *types.PartSet, *types.Commit)
Base() int64
Height() int64
Base() uint64
Height() uint64
}
// BlockchainReactor handles fast sync protocol.
@@ -40,8 +40,8 @@ type BlockchainReactor struct {
logger log.Logger
mtx tmsync.RWMutex
maxPeerHeight int64
syncHeight int64
maxPeerHeight uint64
syncHeight uint64
events chan Event // non-nil during a fast sync
reporter behaviour.Reporter
@@ -50,7 +50,7 @@ type BlockchainReactor struct {
}
type blockApplier interface {
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, int64, error)
ApplyBlock(state state.State, blockID types.BlockID, block *types.Block) (state.State, uint64, error)
}
// XXX: unify naming in this package around tmState

View File

@@ -157,11 +157,11 @@ func newScPeer(peerID p2p.NodeID) *scPeer {
// scheduler will attempt to schedule new block requests with `trySchedule`
// events and remove slow peers with `tryPrune` events.
type scheduler struct {
initHeight int64
initHeight uint64
// next block that needs to be processed. All blocks with smaller height are
// in Processed state.
height int64
height uint64
// lastAdvance tracks the last time a block execution happened.
// syncTimeout is the maximum time the scheduler waits to advance in the fast sync process before finishing.
@@ -197,7 +197,7 @@ func (sc scheduler) String() string {
sc.initHeight, sc.blockStates, sc.peers, sc.pendingBlocks, sc.pendingTime, sc.receivedBlocks)
}
func newScheduler(initHeight int64, startTime time.Time) *scheduler {
func newScheduler(initHeight uint64, startTime time.Time) *scheduler {
sc := scheduler{
initHeight: initHeight,
lastAdvance: startTime,

View File

@@ -7,6 +7,6 @@ import (
//go:generate mockery --case underscore --name BlockStore
type BlockStore interface {
LoadBlockMeta(height int64) *types.BlockMeta
LoadBlockCommit(height int64) *types.Commit
LoadBlockMeta(height uint64) *types.BlockMeta
LoadBlockCommit(height uint64) *types.Commit
}

View File

@@ -372,7 +372,7 @@ func (c *Client) initializeWithTrustOptions(ctx context.Context, options TrustOp
// - header has not been verified yet and is therefore not in the store
//
// Safe for concurrent use by multiple goroutines.
func (c *Client) TrustedLightBlock(height int64) (*types.LightBlock, error) {
func (c *Client) TrustedLightBlock(height uint64) (*types.LightBlock, error) {
height, err := c.compareWithLatestHeight(height)
if err != nil {
return nil, err
@@ -380,7 +380,7 @@ func (c *Client) TrustedLightBlock(height int64) (*types.LightBlock, error) {
return c.trustedStore.LightBlock(height)
}
func (c *Client) compareWithLatestHeight(height int64) (int64, error) {
func (c *Client) compareWithLatestHeight(height uint64) (uint64, error) {
latestHeight, err := c.LastTrustedHeight()
if err != nil {
return 0, fmt.Errorf("can't get last trusted height: %w", err)
@@ -805,7 +805,7 @@ func (c *Client) verifySkippingAgainstPrimary(
// there are no trusted headers.
//
// Safe for concurrent use by multiple goroutines.
func (c *Client) LastTrustedHeight() (int64, error) {
func (c *Client) LastTrustedHeight() (uint64, error) {
return c.trustedStore.LastLightBlockHeight()
}
@@ -813,7 +813,7 @@ func (c *Client) LastTrustedHeight() (int64, error) {
// there are no trusted headers.
//
// Safe for concurrent use by multiple goroutines.
func (c *Client) FirstTrustedHeight() (int64, error) {
func (c *Client) FirstTrustedHeight() (uint64, error) {
return c.trustedStore.FirstLightBlockHeight()
}
@@ -852,7 +852,7 @@ func (c *Client) Cleanup() error {
// cleanupAfter deletes all headers & validator sets after +height+. It also
// resets latestTrustedBlock to the latest header.
func (c *Client) cleanupAfter(height int64) error {
func (c *Client) cleanupAfter(height uint64) error {
prevHeight := c.latestTrustedBlock.Height
for {
@@ -972,7 +972,7 @@ func (c *Client) replacePrimaryProvider() error {
// lightBlockFromPrimary retrieves the lightBlock from the primary provider
// at the specified height. Handles dropout by the primary provider by swapping
// with an alternative provider.
func (c *Client) lightBlockFromPrimary(ctx context.Context, height int64) (*types.LightBlock, error) {
func (c *Client) lightBlockFromPrimary(ctx context.Context, height uint64) (*types.LightBlock, error) {
c.providerMutex.Lock()
l, err := c.primary.LightBlock(ctx, height)
c.providerMutex.Unlock()

View File

@@ -20,9 +20,9 @@ import (
func TestLightClientAttackEvidence_Lunatic(t *testing.T) {
// primary performs a lunatic attack
var (
latestHeight = int64(10)
latestHeight = uint64(10)
valSize = 5
divergenceHeight = int64(6)
divergenceHeight = uint64(6)
primaryHeaders = make(map[int64]*types.SignedHeader, latestHeight)
primaryValidators = make(map[int64]*types.ValidatorSet, latestHeight)
)

View File

@@ -50,8 +50,8 @@ var ErrFailedHeaderCrossReferencing = errors.New(
// ErrVerificationFailed means either sequential or skipping verification has
// failed to verify from header #1 to header #2 due to some reason.
type ErrVerificationFailed struct {
From int64
To int64
From uint64
To uint64
Reason error
}

View File

@@ -120,7 +120,7 @@ func makeVote(header *types.Header, valset *types.ValidatorSet,
return vote
}
func genHeader(chainID string, height int64, bTime time.Time, txs types.Txs,
func genHeader(chainID string, height uint64, bTime time.Time, txs types.Txs,
valset, nextValset *types.ValidatorSet, appHash, consHash, resHash []byte) *types.Header {
return &types.Header{

View File

@@ -19,7 +19,7 @@ type Provider interface {
// issues, an error will be returned.
// If there's no LightBlock for the given height, ErrLightBlockNotFound
// error is returned.
LightBlock(ctx context.Context, height int64) (*types.LightBlock, error)
LightBlock(ctx context.Context, height uint64) (*types.LightBlock, error)
// ReportEvidence reports an evidence of misbehavior.
ReportEvidence(context.Context, types.Evidence) error

View File

@@ -14,7 +14,7 @@ type Store interface {
// ValidatorSet (h: height).
//
// height must be > 0.
DeleteLightBlock(height int64) error
DeleteLightBlock(height uint64) error
// LightBlock returns the LightBlock that corresponds to the given
// height.
@@ -22,22 +22,22 @@ type Store interface {
// height must be > 0.
//
// If LightBlock is not found, ErrLightBlockNotFound is returned.
LightBlock(height int64) (*types.LightBlock, error)
LightBlock(height uint64) (*types.LightBlock, error)
// LastLightBlockHeight returns the last (newest) LightBlock height.
//
// If the store is empty, -1 and nil error are returned.
LastLightBlockHeight() (int64, error)
LastLightBlockHeight() (uint64, error)
// FirstLightBlockHeight returns the first (oldest) LightBlock height.
//
// If the store is empty, -1 and nil error are returned.
FirstLightBlockHeight() (int64, error)
FirstLightBlockHeight() (uint64, error)
// LightBlockBefore returns the LightBlock before a certain height.
//
// height must be > 0 && <= LastLightBlockHeight.
LightBlockBefore(height int64) (*types.LightBlock, error)
LightBlockBefore(height uint64) (*types.LightBlock, error)
// Prune removes headers & the associated validator sets when Store reaches a
// defined size (number of header & validator set pairs).

View File

@@ -31,7 +31,7 @@ type TrustOptions struct {
// Header's Height and Hash must both be provided to force the trusting of a
// particular header.
Height int64
Height uint64
Hash []byte
}

View File

@@ -68,7 +68,7 @@ func TestCacheAfterUpdate(t *testing.T) {
tx := types.Tx{byte(v)}
updateTxs = append(updateTxs, tx)
}
err := mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil)
err := mempool.Update(uint64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil)
require.NoError(t, err)
for _, v := range tc.reAddIndices {

View File

@@ -36,8 +36,8 @@ var newline = []byte("\n")
// be efficiently accessed by multiple concurrent readers.
type CListMempool struct {
// Atomic integers
height int64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes
height uint64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes
// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
@@ -83,7 +83,7 @@ type CListMempoolOption func(*CListMempool)
func NewCListMempool(
config *cfg.MempoolConfig,
proxyAppConn proxy.AppConnMempool,
height int64,
height uint64,
options ...CListMempoolOption,
) *CListMempool {
mempool := &CListMempool{
@@ -577,7 +577,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
// Lock() must be help by the caller during execution.
func (mem *CListMempool) Update(
height int64,
height uint64,
txs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
preCheck PreCheckFunc,
@@ -672,7 +672,7 @@ func (mem *CListMempool) recheckTxs() {
// mempoolTx is a transaction that successfully ran
type mempoolTx struct {
height int64 // height that this tx had been validated in
height uint64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
tx types.Tx //
@@ -682,8 +682,8 @@ type mempoolTx struct {
}
// Height returns the height for this transaction
func (memTx *mempoolTx) Height() int64 {
return atomic.LoadInt64(&memTx.height)
func (memTx *mempoolTx) Height() uint64 {
return atomic.LoadUint64(&memTx.height)
}
//--------------------------------------------------------------------------------

View File

@@ -40,7 +40,7 @@ type Mempool interface {
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: Lock/Unlock must be managed by caller
Update(
blockHeight int64,
blockHeight uint64,
blockTxs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
newPreFn PreCheckFunc,

View File

@@ -21,7 +21,7 @@ func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
func (Mempool) Update(
_ int64,
_ uint64,
_ types.Txs,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,

View File

@@ -39,7 +39,7 @@ const (
// peer information. This should eventually be replaced with a message-oriented
// approach utilizing the p2p stack.
type PeerManager interface {
GetHeight(p2p.NodeID) int64
GetHeight(p2p.NodeID) uint64
}
// Reactor implements a service that contains mempool of txs that are broadcasted

View File

@@ -233,7 +233,7 @@ func TestCreateProposalBlock(t *testing.T) {
logger := log.TestingLogger()
const height int64 = 1
const height uint64 = 1
state, stateDB, privVals := state(1, height)
stateStore := sm.NewStore(stateDB)
maxBytes := 16384
@@ -325,7 +325,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
logger := log.TestingLogger()
const height int64 = 1
const height uint64 = 1
state, stateDB, _ := state(1, height)
stateStore := sm.NewStore(stateDB)
const maxBytes int64 = 16384
@@ -385,7 +385,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
logger := log.TestingLogger()
state, stateDB, _ := state(types.MaxVotesCount, int64(1))
state, stateDB, _ := state(types.MaxVotesCount, uint64(1))
stateStore := sm.NewStore(stateDB)
const maxBytes int64 = 1024 * 1024 * 2
state.ConsensusParams.Block.MaxBytes = maxBytes
@@ -522,7 +522,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
}
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
func state(nVals int, height uint64) (sm.State, dbm.DB, []types.PrivValidator) {
privVals := make([]types.PrivValidator, nVals)
vals := make([]types.GenesisValidator, nVals)
for i := 0; i < nVals; i++ {

View File

@@ -821,7 +821,7 @@ func (m *PeerManager) findUpgradeCandidate(id NodeID, score PeerScore) NodeID {
// consensus and mempool reactors. These dependencies should be removed from the
// reactors, and instead query this information independently via new P2P
// protocol additions.
func (m *PeerManager) GetHeight(peerID NodeID) int64 {
func (m *PeerManager) GetHeight(peerID NodeID) uint64 {
m.mtx.Lock()
defer m.mtx.Unlock()
@@ -837,7 +837,7 @@ func (m *PeerManager) GetHeight(peerID NodeID) int64 {
// consensus and mempool reactors. These dependencies should be removed from the
// reactors, and instead query this information independently via new P2P
// protocol additions.
func (m *PeerManager) SetHeight(peerID NodeID, height int64) error {
func (m *PeerManager) SetHeight(peerID NodeID, height uint64) error {
m.mtx.Lock()
defer m.mtx.Unlock()
@@ -1009,7 +1009,7 @@ type peerInfo struct {
// These fields are ephemeral, i.e. not persisted to the database.
Persistent bool
Height int64
Height uint64
}
// peerInfoFromProto converts a Protobuf PeerInfo message to a peerInfo,

View File

@@ -80,8 +80,8 @@ func TestBlockEvents(t *testing.T) {
}
})
var firstBlockHeight int64
for i := int64(0); i < 3; i++ {
var firstBlockHeight uint64
for i := uint64(0); i < 3; i++ {
event := <-eventCh
blockEvent, ok := event.Data.(types.EventDataNewBlock)
require.True(t, ok)

View File

@@ -10,11 +10,11 @@ import (
)
// Waiter is informed of current height, decided whether to quit early
type Waiter func(delta int64) (abort error)
type Waiter func(delta uint64) (abort error)
// DefaultWaitStrategy is the standard backoff algorithm,
// but you can plug in another one
func DefaultWaitStrategy(delta int64) (abort error) {
func DefaultWaitStrategy(delta uint64) (abort error) {
if delta > 10 {
return fmt.Errorf("waiting for %d blocks... aborting", delta)
} else if delta > 0 {
@@ -32,11 +32,11 @@ func DefaultWaitStrategy(delta int64) (abort error) {
//
// If waiter is nil, we use DefaultWaitStrategy, but you can also
// provide your own implementation
func WaitForHeight(c StatusClient, h int64, waiter Waiter) error {
func WaitForHeight(c StatusClient, h uint64, waiter Waiter) error {
if waiter == nil {
waiter = DefaultWaitStrategy
}
delta := int64(1)
delta := uint64(1)
for delta > 0 {
s, err := c.Status(context.Background())
if err != nil {

View File

@@ -51,7 +51,7 @@ func TestWaitForHeight(t *testing.T) {
// since we can't update in a background goroutine (test --race)
// we use the callback to update the status height
myWaiter := func(delta int64) error {
myWaiter := func(delta uint64) error {
// update the height for the next call
m.Call.Response = &ctypes.ResultStatus{SyncInfo: ctypes.SyncInfo{LatestBlockHeight: 15}}
return client.DefaultWaitStrategy(delta)

View File

@@ -374,7 +374,7 @@ func (c *baseRPCClient) Health(ctx context.Context) (*ctypes.ResultHealth, error
func (c *baseRPCClient) BlockchainInfo(
ctx context.Context,
minHeight,
maxHeight int64,
maxHeight uint64,
) (*ctypes.ResultBlockchainInfo, error) {
result := new(ctypes.ResultBlockchainInfo)
_, err := c.caller.Call(ctx, "blockchain",
@@ -395,7 +395,7 @@ func (c *baseRPCClient) Genesis(ctx context.Context) (*ctypes.ResultGenesis, err
return result, nil
}
func (c *baseRPCClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) {
func (c *baseRPCClient) Block(ctx context.Context, height *uint64) (*ctypes.ResultBlock, error) {
result := new(ctypes.ResultBlock)
params := make(map[string]interface{})
if height != nil {
@@ -422,7 +422,7 @@ func (c *baseRPCClient) BlockByHash(ctx context.Context, hash []byte) (*ctypes.R
func (c *baseRPCClient) BlockResults(
ctx context.Context,
height *int64,
height *uint64,
) (*ctypes.ResultBlockResults, error) {
result := new(ctypes.ResultBlockResults)
params := make(map[string]interface{})
@@ -436,7 +436,7 @@ func (c *baseRPCClient) BlockResults(
return result, nil
}
func (c *baseRPCClient) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) {
func (c *baseRPCClient) Commit(ctx context.Context, height *uint64) (*ctypes.ResultCommit, error) {
result := new(ctypes.ResultCommit)
params := make(map[string]interface{})
if height != nil {
@@ -492,7 +492,7 @@ func (c *baseRPCClient) TxSearch(
func (c *baseRPCClient) Validators(
ctx context.Context,
height *int64,
height *uint64,
page,
perPage *int,
) (*ctypes.ResultValidators, error) {

View File

@@ -64,11 +64,11 @@ type ABCIClient interface {
// SignClient groups together the functionality needed to get valid signatures
// and prove anything about the chain.
type SignClient interface {
Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error)
Block(ctx context.Context, height *uint64) (*ctypes.ResultBlock, error)
BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error)
BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error)
Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error)
Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error)
BlockResults(ctx context.Context, height *uint64) (*ctypes.ResultBlockResults, error)
Commit(ctx context.Context, height *uint64) (*ctypes.ResultCommit, error)
Validators(ctx context.Context, height *uint64, page, perPage *int) (*ctypes.ResultValidators, error)
Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error)
TxSearch(ctx context.Context, query string, prove bool, page, perPage *int,
orderBy string) (*ctypes.ResultTxSearch, error)
@@ -77,7 +77,7 @@ type SignClient interface {
// HistoryClient provides access to data from genesis to now in large chunks.
type HistoryClient interface {
Genesis(context.Context) (*ctypes.ResultGenesis, error)
BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error)
BlockchainInfo(ctx context.Context, minHeight, maxHeight uint64) (*ctypes.ResultBlockchainInfo, error)
}
// StatusClient provides access to general chain info.

View File

@@ -123,7 +123,7 @@ func (c *Local) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusStat
return core.ConsensusState(c.ctx)
}
func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) {
func (c *Local) ConsensusParams(ctx context.Context, height *uint64) (*ctypes.ResultConsensusParams, error) {
return core.ConsensusParams(c.ctx, height)
}
@@ -145,7 +145,7 @@ func (c *Local) DialPeers(
return core.UnsafeDialPeers(c.ctx, peers, persistent, unconditional, private)
}
func (c *Local) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
func (c *Local) BlockchainInfo(ctx context.Context, minHeight, maxHeight uint64) (*ctypes.ResultBlockchainInfo, error) {
return core.BlockchainInfo(c.ctx, minHeight, maxHeight)
}
@@ -153,7 +153,7 @@ func (c *Local) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) {
return core.Genesis(c.ctx)
}
func (c *Local) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) {
func (c *Local) Block(ctx context.Context, height *uint64) (*ctypes.ResultBlock, error) {
return core.Block(c.ctx, height)
}
@@ -161,15 +161,15 @@ func (c *Local) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlo
return core.BlockByHash(c.ctx, hash)
}
func (c *Local) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) {
func (c *Local) BlockResults(ctx context.Context, height *uint64) (*ctypes.ResultBlockResults, error) {
return core.BlockResults(c.ctx, height)
}
func (c *Local) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) {
func (c *Local) Commit(ctx context.Context, height *uint64) (*ctypes.ResultCommit, error) {
return core.Commit(c.ctx, height)
}
func (c *Local) Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) {
func (c *Local) Validators(ctx context.Context, height *uint64, page, perPage *int) (*ctypes.ResultValidators, error) {
return core.Validators(c.ctx, height, page, perPage)
}

View File

@@ -73,7 +73,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
return result, nil
}
func validatorAtHeight(h int64) *types.Validator {
func validatorAtHeight(h uint64) *types.Validator {
vals, err := env.StateStore.LoadValidators(h)
if err != nil {
return nil

View File

@@ -16,7 +16,7 @@ import (
type BlockStore interface {
Base() uint64
Height() uint64
Size() int64
Size() uint64
LoadBaseMeta() *types.BlockMeta
LoadBlockMeta(height uint64) *types.BlockMeta

View File

@@ -292,11 +292,7 @@ func (conR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
panic("Bad VoteSetBitsMessage field Type. Forgot to add a check in ValidateBasic?")
}
src.TrySend(VoteSetBitsChannel, MustEncode(&VoteSetBitsMessage{
Height: msg.Height,
Round: msg.Round,
Type: msg.Type,
BlockID: msg.BlockID,
Votes: ourVotes,
x
}))
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))