add extra tests

This commit is contained in:
Callum Waters
2021-08-26 10:44:44 +02:00
parent d71edfc960
commit 18982cd831
10 changed files with 193 additions and 28 deletions

View File

@@ -66,6 +66,7 @@ func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.No
}
}()
fmt.Println("awaiting for a response")
// wait for a response, cancel or timeout
select {
case resp := <-callCh:
@@ -187,10 +188,12 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li
return nil, provider.ErrLightBlockNotFound
}
case context.DeadlineExceeded, context.Canceled:
fmt.Println("context canceled")
return nil, err
case errPeerAlreadyBusy:
return nil, provider.ErrLightBlockNotFound
case errNoResponse:
fmt.Println("no response")
return nil, provider.ErrNoResponse
default: // errDisconnected
return nil, provider.ErrUnreliableProvider{Reason: err.Error()}

View File

@@ -41,7 +41,7 @@ func TestDispatcherBasic(t *testing.T) {
wg.Add(1)
go func(height int64) {
defer wg.Done()
lb, err := d.LightBlock(context.Background(), height, peers[height - 1])
lb, err := d.LightBlock(context.Background(), height, peers[height-1])
require.NoError(t, err)
require.NotNil(t, lb)
require.Equal(t, lb.Height, height)

View File

@@ -63,7 +63,7 @@ var (
MsgType: new(ssproto.Message),
Descriptor: &p2p.ChannelDescriptor{
ID: byte(LightBlockChannel),
Priority: 1,
Priority: 11,
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128,
@@ -114,7 +114,7 @@ const (
// lightBlockResponseTimeout is how long the dispatcher waits for a peer to
// return a light block
lightBlockResponseTimeout = 30 * time.Second
lightBlockResponseTimeout = 10 * time.Second
// consensusParamsResponseTimeout is the time the p2p state provider waits
// before performing a secondary call
@@ -130,11 +130,11 @@ const (
type Reactor struct {
service.BaseService
chainID string
chainID string
initialHeight int64
cfg config.StateSyncConfig
stateStore sm.Store
blockStore *store.BlockStore
cfg config.StateSyncConfig
stateStore sm.Store
blockStore *store.BlockStore
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
@@ -154,9 +154,9 @@ type Reactor struct {
// These will only be set when a state sync is in progress. It is used to feed
// received snapshots and chunks into the syncer and manage incoming and outgoing
// providers.
mtx tmsync.RWMutex
syncer *syncer
providers map[types.NodeID]*blockProvider
mtx tmsync.RWMutex
syncer *syncer
providers map[types.NodeID]*blockProvider
stateProvider StateProvider
}
@@ -178,7 +178,7 @@ func NewReactor(
tempDir string,
) *Reactor {
r := &Reactor{
chainID: chainID,
chainID: chainID,
cfg: cfg,
conn: conn,
connQuery: connQuery,
@@ -242,6 +242,7 @@ func (r *Reactor) OnStop() {
<-r.snapshotCh.Done()
<-r.chunkCh.Done()
<-r.blockCh.Done()
<-r.paramsCh.Done()
<-r.peerUpdates.Done()
}
@@ -250,19 +251,17 @@ func (r *Reactor) OnStop() {
// store and persist the commit at that height so that either consensus or
// blocksync can commence. It will then proceed to backfill the necessary amount
// of historical blocks before participating in consensus
func (r *Reactor) Sync(
ctx context.Context,
chainID string,
initialHeight int64,
) (sm.State, error) {
r.waitForEnoughPeers(ctx, 3)
func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.waitForEnoughPeers(ctx, 2)
r.mtx.Lock()
if r.syncer != nil {
r.mtx.Unlock()
return sm.State{}, errors.New("a state sync is already in progress")
}
r.initStateProvider(ctx, chainID, initialHeight)
if err := r.initStateProvider(ctx, r.chainID, r.initialHeight); err != nil {
return sm.State{}, err
}
r.syncer = newSyncer(
r.cfg,
@@ -979,7 +978,7 @@ func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initial
if r.cfg.UseP2P {
spLogger.Info("Generating P2P state provider")
peers := r.peers.All()
providers := make([]provider.Provider, len(peers))
for idx, p := range peers {

View File

@@ -19,6 +19,8 @@ import (
"github.com/tendermint/tendermint/internal/statesync/mocks"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/proxy"
// "github.com/tendermint/tendermint/light"
"github.com/tendermint/tendermint/light/provider"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
@@ -159,7 +161,8 @@ func setup(
)
// override the dispatcher with one with a shorter timeout
rts.reactor.dispatcher = NewDispatcher(rts.blockChannel.Out, 1*time.Second)
rts.reactor.dispatcher = NewDispatcher(rts.blockChannel.Out,
100*time.Millisecond)
rts.syncer = newSyncer(
*cfg,
@@ -183,6 +186,57 @@ func setup(
return rts
}
func TestReactor_Sync(t *testing.T) {
var snapshotHeight int64 = 7
rts := setup(t, nil, nil, nil, 2)
chain := buildLightBlockChain(t, 1, 10, time.Now())
// app accepts any snapshot
rts.conn.On("OfferSnapshotSync", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
// app accepts every chunk
rts.conn.On("ApplySnapshotChunkSync", ctx, mock.AnythingOfType("types.RequestApplySnapshotChunk")).
Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
// app query returns valid state app hash
rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(&abci.ResponseInfo{
AppVersion: 9,
LastBlockHeight: snapshotHeight,
LastBlockAppHash: chain[snapshotHeight+1].AppHash,
}, nil)
// store accepts state and validator sets
rts.stateStore.On("Bootstrap", mock.AnythingOfType("state.State")).Return(nil)
rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
mock.AnythingOfType("*types.ValidatorSet")).Return(nil)
closeCh := make(chan struct{})
defer close(closeCh)
go handleLightBlockRequests(t, chain, rts.blockOutCh,
rts.blockInCh, closeCh, 0)
go graduallyAddPeers(rts.peerUpdateCh, closeCh, 1*time.Second)
go handleSnapshotRequests(t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
{
Height: uint64(snapshotHeight),
Format: 1,
Chunks: 1,
},
})
go handleChunkRequests(t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc"))
go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh)
// update the config to use the p2p provider
rts.reactor.cfg.UseP2P = true
rts.reactor.cfg.TrustHeight = 1
rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
rts.reactor.cfg.DiscoveryTime = 2 * time.Second
_, err := rts.reactor.Sync(context.Background())
require.NoError(t, err)
}
func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
rts := setup(t, nil, nil, nil, 2)
@@ -688,3 +742,82 @@ func mockLB(t *testing.T, height int64, time time.Time, lastBlockID types.BlockI
ValidatorSet: currentVals,
}
}
func graduallyAddPeers(
peerUpdateCh chan p2p.PeerUpdate,
closeCh chan struct{},
interval time.Duration,
) {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
fmt.Println("adding new peer")
peerUpdateCh <- p2p.PeerUpdate{
NodeID: factory.RandomNodeID(),
Status: p2p.PeerStatusUp,
}
case <-closeCh:
return
}
}
}
func handleSnapshotRequests(
t *testing.T,
receivingCh chan p2p.Envelope,
sendingCh chan p2p.Envelope,
closeCh chan struct{},
snapshots []snapshot,
) {
for {
select {
case envelope := <-receivingCh:
_, ok := envelope.Message.(*ssproto.SnapshotsRequest)
require.True(t, ok)
for _, snapshot := range snapshots {
sendingCh <- p2p.Envelope{
From: envelope.To,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
},
}
}
case <-closeCh:
return
}
}
}
func handleChunkRequests(
t *testing.T,
receivingCh chan p2p.Envelope,
sendingCh chan p2p.Envelope,
closeCh chan struct{},
chunk []byte,
) {
for {
select {
case envelope := <-receivingCh:
msg, ok := envelope.Message.(*ssproto.ChunkRequest)
require.True(t, ok)
sendingCh <- p2p.Envelope{
From: envelope.To,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,
Index: msg.Index,
Chunk: chunk,
Missing: false,
},
}
case <-closeCh:
return
}
}
}

View File

@@ -87,6 +87,7 @@ func NewRPCStateProvider(
// AppHash implements StateProvider.
func (s *stateProviderRPC) AppHash(ctx context.Context, height uint64) ([]byte, error) {
fmt.Println("requesting app hash")
s.Lock()
defer s.Unlock()
@@ -333,7 +334,7 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State,
return state, nil
}
// addProvider dynamically adds a peer as a new witness. A limit of 6 providers is kept as a
// addProvider dynamically adds a peer as a new witness. A limit of 6 providers is kept as a
// heuristic. Too many overburdens the network and too little compromises the second layer of security.
func (s *stateProviderP2P) addProvider(p lightprovider.Provider) {
if len(s.lc.Witnesses()) < 6 {
@@ -350,9 +351,11 @@ func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (t
if !ok {
panic("expected p2p state provider to use p2p block providers")
}
// extract the nodeID of the provider
peer, err := types.NewNodeID(p.String())
if err != nil {
return types.ConsensusParams{}, fmt.Errorf("invalid provider node id: %w", err)
return types.ConsensusParams{}, fmt.Errorf("invalid provider node id: %w, provider: %s", err, p.String())
}
select {

View File

@@ -40,7 +40,7 @@ var (
errRejectSender = errors.New("snapshot sender was rejected")
// errVerifyFailed is returned by Sync() when app hash or last height
// verification fails.
errVerifyFailed = errors.New("verification failed")
errVerifyFailed = errors.New("verification with app failed")
// errTimeout is returned by Sync() when we've waited too long to receive a chunk.
errTimeout = errors.New("timed out waiting for chunk")
// errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.

View File

@@ -12,3 +12,7 @@ func TestMakeHeader(t *testing.T) {
_, err := MakeHeader(&types.Header{})
assert.NoError(t, err)
}
func TestRandomNodeID(t *testing.T) {
assert.NotPanics(t, func() { RandomNodeID() })
}

View File

@@ -0,0 +1,25 @@
package factory
import (
"encoding/hex"
"strings"
"github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/types"
)
func NodeID(str string) types.NodeID {
id, err := types.NewNodeID(strings.Repeat(str, 2*types.NodeIDByteLength))
if err != nil {
panic(err)
}
return id
}
func RandomNodeID() types.NodeID {
id, err := types.NewNodeID(hex.EncodeToString(rand.Bytes( types.NodeIDByteLength)))
if err != nil {
panic(err)
}
return id
}

View File

@@ -682,8 +682,8 @@ func (n *nodeImpl) OnStart() error {
// FIXME: We shouldn't allow state sync to silently error out without
// bubbling up the error and gracefully shutting down the rest of the node
go func() {
n.Logger.Info("staring state sync")
state, err := n.stateSyncReactor.Sync(context.TODO(), state.ChainID, state.InitialHeight)
n.Logger.Info("starting state sync")
state, err := n.stateSyncReactor.Sync(context.TODO())
if err != nil {
n.Logger.Error("state sync failed", "err", err)
return

View File

@@ -1,10 +1,8 @@
[node.validator01]
snapshot_interval = 2
snapshot_interval = 3
[node.validator02]
snapshot_interval = 3
[node.validator03]
snapshot_interval = 3
[node.validator04]
state_sync = true
start_at = 5
persistent_peers = ["validator01", "validator02"]