statesync provider

This commit is contained in:
Callum Waters
2021-08-26 18:24:09 +02:00
parent ffe02e5209
commit 25aa63ab9f
14 changed files with 116 additions and 123 deletions

View File

@@ -884,16 +884,26 @@ func (cfg *MempoolConfig) ValidateBasic() error {
type StateSyncConfig struct {
Enable bool `mapstructure:"enable"`
TempDir string `mapstructure:"temp-dir"`
// Light blocks needed for state verification can be obtained either via
// the P2P layer or RPC layer.
UseP2P bool `mapstructure:"use-p2p"`
RPCServers []string `mapstructure:"rpc-servers"`
TrustPeriod time.Duration `mapstructure:"trust-period"`
TrustHeight int64 `mapstructure:"trust-height"`
TrustHash string `mapstructure:"trust-hash"`
// the P2P layer or RPC layer. Default is RPC
UseP2P bool `mapstructure:"use-p2p"`
// If using RPC, at least two endpoints need to be provided
RPCServers []string `mapstructure:"rpc-servers"`
// Source of trust. State sync uses a light client for verification so
// some trusted source must be provided
TrustPeriod time.Duration `mapstructure:"trust-period"`
TrustHeight int64 `mapstructure:"trust-height"`
TrustHash string `mapstructure:"trust-hash"`
DiscoveryTime time.Duration `mapstructure:"discovery-time"`
ChunkRequestTimeout time.Duration `mapstructure:"chunk-request-timeout"`
Fetchers int32 `mapstructure:"fetchers"`
// Fetchers defines the amount of parallel workers to fetch statesync chunks
// and light blocks
Fetchers int32 `mapstructure:"fetchers"`
}
func (cfg *StateSyncConfig) TrustHashBytes() []byte {

View File

@@ -66,11 +66,9 @@ func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.No
}
}()
fmt.Println("awaiting for a response")
// wait for a response, cancel or timeout
select {
case resp := <-callCh:
fmt.Printf("received response, height %d peer %v\n", height, peer)
return resp, nil
case <-ctx.Done():
@@ -102,25 +100,19 @@ func (d *Dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh
d.calls[peer] = ch
// send request
fmt.Printf("sending request dispatch, height %d peer %v\n", height, peer)
d.requestCh <- p2p.Envelope{
To: peer,
Message: &ssproto.LightBlockRequest{
Height: uint64(height),
},
}
fmt.Printf("sent request dispatch, height %d peer %v\n", height, peer)
return ch, nil
}
// respond allows the underlying process which receives requests on the
// requestCh to respond with the respective light block
func (d *Dispatcher) Respond(lb *proto.LightBlock, peer types.NodeID) error {
if lb != nil {
fmt.Printf("trying to respond with light block for height %d from %v\n", lb.SignedHeader.Header.Height, peer)
} else {
fmt.Println("responded with empty block")
}
d.mtx.Lock()
defer d.mtx.Unlock()
@@ -162,6 +154,10 @@ func (d *Dispatcher) Stop() {
//
// TODO: This should probably be moved over to the light package but as we're
// not yet officially supporting p2p light clients we'll leave this here for now.
//
// NOTE: blockProvider will return an error with concurrent calls. However, we don't
// need a mutex because a light client (and the backfill process) will never call a
// method more than once at the same time
type blockProvider struct {
peer types.NodeID
chainID string
@@ -180,7 +176,6 @@ func NewBlockProvider(peer types.NodeID, chainID string, dispatcher *Dispatcher)
// LightBlock fetches a light block from the peer at a specified height returning either a light block
// or an appropriate error. Concurrently unsafe
func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) {
fmt.Println("fetching block for block provider")
lb, err := p.dispatcher.LightBlock(ctx, height, p.peer)
switch err {
case nil:
@@ -188,12 +183,10 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li
return nil, provider.ErrLightBlockNotFound
}
case context.DeadlineExceeded, context.Canceled:
fmt.Println("context canceled")
return nil, err
case errPeerAlreadyBusy:
return nil, provider.ErrLightBlockNotFound
case errNoResponse:
fmt.Println("no response")
return nil, provider.ErrNoResponse
default: // errDisconnected
return nil, provider.ErrUnreliableProvider{Reason: err.Error()}

View File

@@ -107,7 +107,7 @@ const (
chunkMsgSize = int(16e6) // ~16MB
// lightBlockMsgSize is the maximum size of a lightBlockResponseMessage
lightBlockMsgSize = int(1e7) // ~10MB
lightBlockMsgSize = int(1e8) // ~10MB
// paramMsgSize is the maximum size of a paramsResponseMessage
paramMsgSize = int(1e5) // ~100kb
@@ -659,14 +659,12 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
return err
}
if lb == nil {
r.Logger.Info("returning nil light block", "height", msg.Height)
r.blockCh.Out <- p2p.Envelope{
To: envelope.From,
Message: &ssproto.LightBlockResponse{
LightBlock: nil,
},
}
r.Logger.Info("sent light block response", "height", msg.Height)
return nil
}
@@ -684,14 +682,16 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
LightBlock: lbproto,
},
}
r.Logger.Info("sent light block response", "height", lb.SignedHeader.Header.Height)
case *ssproto.LightBlockResponse:
r.Logger.Info("received light block response")
if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil {
r.Logger.Error("error processing light block response", "err", err)
var height int64 = 0
if msg.LightBlock != nil {
height = msg.LightBlock.SignedHeader.Header.Height
}
r.Logger.Info("received light block response", "peer", envelope.From, "height", height)
if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil {
r.Logger.Error("error processing light block response", "err", err, "height", height)
}
r.Logger.Info("processed light block response")
default:
return fmt.Errorf("received unknown message: %T", msg)
@@ -812,9 +812,6 @@ func (r *Reactor) processCh(ch *p2p.Channel, chName string) {
for {
select {
case envelope := <-ch.In:
if chName == "light block" {
fmt.Println("received p2p message for light block")
}
if err := r.handleMessage(ch.ID, envelope); err != nil {
r.Logger.Error(fmt.Sprintf("failed to process %s message", chName),
"ch_id", ch.ID, "envelope", envelope, "err", err)
@@ -978,8 +975,6 @@ func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initial
spLogger := r.Logger.With("module", "stateprovider")
if r.cfg.UseP2P {
spLogger.Info("Generating P2P state provider")
peers := r.peers.All()
providers := make([]provider.Provider, len(peers))
for idx, p := range peers {
@@ -990,7 +985,6 @@ func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initial
if err != nil {
return err
}
spLogger.Info("Finished generating P2P state provider")
} else {
r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
if err != nil {

View File

@@ -231,8 +231,9 @@ func TestReactor_Sync(t *testing.T) {
rts.reactor.cfg.UseP2P = true
rts.reactor.cfg.TrustHeight = 1
rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
rts.reactor.cfg.DiscoveryTime = 2 * time.Second
rts.reactor.cfg.DiscoveryTime = 1 * time.Second
// Run state sync
_, err := rts.reactor.Sync(context.Background())
require.NoError(t, err)
}
@@ -752,7 +753,6 @@ func graduallyAddPeers(
for {
select {
case <-ticker.C:
fmt.Println("adding new peer")
peerUpdateCh <- p2p.PeerUpdate{
NodeID: factory.RandomNodeID(),
Status: p2p.PeerStatusUp,

View File

@@ -45,9 +45,9 @@ var (
errTimeout = errors.New("timed out waiting for chunk")
// errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
errNoSnapshots = errors.New("no suitable snapshots found")
// errStateCommitTimeout is returned by Sync() when the timeout for retrieving
// errStateProviderTimeout is returned by Sync() when the timeout for retrieving
// tendermint state or the commit is exceeded
errStateCommitTimeout = errors.New("timed out trying to retrieve state and commit")
errStateProviderTimeout = errors.New("state provider timed out")
)
// syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
@@ -118,19 +118,6 @@ func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
// AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen
// snapshot was accepted and added.
func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
// Fetch the app hash corresponding to the snapshot
// TODO: We do all this computation for each and every snapshot we receive (even after
// attempting to fetch the chunks and restore state). We should only do this when a snapshot
// is selected and the actual sync begins
appHash, err := s.stateProvider.AppHash(ctx, snapshot.Height)
if err != nil {
return false, fmt.Errorf("failed to get app hash: %w", err)
}
snapshot.trustedAppHash = appHash
added, err := s.snapshots.Add(peerID, snapshot)
if err != nil {
return false, err
@@ -195,7 +182,7 @@ func (s *syncer) SyncAny(
if discoveryTime == 0 {
return sm.State{}, nil, errNoSnapshots
}
requestSnapshots()
// requestSnapshots()
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime)
continue
@@ -246,7 +233,7 @@ func (s *syncer) SyncAny(
s.logger.Info("Snapshot sender rejected", "peer", peer)
}
case errors.Is(err, errStateCommitTimeout):
case errors.Is(err, errStateProviderTimeout):
s.logger.Info("Timed out retrieving state and commit, rejecting and retrying...", "height", snapshot.Height)
s.snapshots.Reject(snapshot)
@@ -280,8 +267,21 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
s.mtx.Unlock()
}()
hctx, hcancel := context.WithTimeout(ctx, 30*time.Second)
defer hcancel()
// Fetch the app hash corresponding to the snapshot
appHash, err := s.stateProvider.AppHash(hctx, snapshot.Height)
if err != nil {
if err == context.DeadlineExceeded && ctx.Err() == nil {
return sm.State{}, nil, errStateProviderTimeout
}
return sm.State{}, nil, fmt.Errorf("failed to get app hash: %w", err)
}
snapshot.trustedAppHash = appHash
// Offer snapshot to ABCI app.
err := s.offerSnapshot(ctx, snapshot)
err = s.offerSnapshot(ctx, snapshot)
if err != nil {
return sm.State{}, nil, err
}
@@ -293,7 +293,7 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
go s.fetchChunks(fetchCtx, snapshot, chunks)
}
pctx, pcancel := context.WithTimeout(ctx, 30*time.Second)
pctx, pcancel := context.WithTimeout(ctx, 1*time.Minute)
defer pcancel()
// Optimistically build new state, so we don't discover any light client failures at the end.
@@ -301,7 +301,7 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
if err != nil {
// check if the provider context exceeded the 10 second deadline
if err == context.DeadlineExceeded && ctx.Err() == nil {
return sm.State{}, nil, errStateCommitTimeout
return sm.State{}, nil, errStateProviderTimeout
}
return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
@@ -310,7 +310,7 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
if err != nil {
// check if the provider context exceeded the 10 second deadline
if err == context.DeadlineExceeded && ctx.Err() == nil {
return sm.State{}, nil, errStateCommitTimeout
return sm.State{}, nil, errStateProviderTimeout
}
return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)

View File

@@ -17,7 +17,7 @@ func NodeID(str string) types.NodeID {
}
func RandomNodeID() types.NodeID {
id, err := types.NewNodeID(hex.EncodeToString(rand.Bytes( types.NodeIDByteLength)))
id, err := types.NewNodeID(hex.EncodeToString(rand.Bytes(types.NodeIDByteLength)))
if err != nil {
panic(err)
}

View File

@@ -724,51 +724,32 @@ func TestClient_BackwardsVerification(t *testing.T) {
}
{
testCases := []struct {
headers map[int64]*types.SignedHeader
vals map[int64]*types.ValidatorSet
}{
{
// 7) provides incorrect height
headers: map[int64]*types.SignedHeader{
2: keys.GenSignedHeader(chainID, 1, bTime.Add(30*time.Minute), nil, vals, vals,
hash("app_hash"), hash("cons_hash"), hash("results_hash"), 0, len(keys)),
3: h3,
},
vals: valSet,
},
{
// 8) provides incorrect hash
headers: map[int64]*types.SignedHeader{
2: keys.GenSignedHeader(chainID, 2, bTime.Add(30*time.Minute), nil, vals, vals,
hash("app_hash2"), hash("cons_hash23"), hash("results_hash30"), 0, len(keys)),
3: h3,
},
vals: valSet,
},
// 8) provides incorrect hash
headers := map[int64]*types.SignedHeader{
2: keys.GenSignedHeader(chainID, 2, bTime.Add(30*time.Minute), nil, vals, vals,
hash("app_hash2"), hash("cons_hash23"), hash("results_hash30"), 0, len(keys)),
3: h3,
}
vals := valSet
mockNode := mockNodeFromHeadersAndVals(headers, vals)
c, err := light.NewClient(
ctx,
chainID,
light.TrustOptions{
Period: 1 * time.Hour,
Height: 3,
Hash: h3.Hash(),
},
mockNode,
[]provider.Provider{mockNode},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
)
require.NoError(t, err)
for idx, tc := range testCases {
mockNode := mockNodeFromHeadersAndVals(tc.headers, tc.vals)
c, err := light.NewClient(
ctx,
chainID,
light.TrustOptions{
Period: 1 * time.Hour,
Height: 3,
Hash: h3.Hash(),
},
mockNode,
[]provider.Provider{mockNode},
dbs.New(dbm.NewMemDB()),
light.Logger(log.TestingLogger()),
)
require.NoError(t, err, idx)
_, err = c.VerifyLightBlockAtHeight(ctx, 2, bTime.Add(1*time.Hour).Add(1*time.Second))
assert.Error(t, err, idx)
mockNode.AssertExpectations(t)
}
_, err = c.VerifyLightBlockAtHeight(ctx, 2, bTime.Add(1*time.Hour).Add(1*time.Second))
assert.Error(t, err)
mockNode.AssertExpectations(t)
}
}

View File

@@ -32,7 +32,7 @@ var (
// FIXME: v2 disabled due to flake
nodeBlockSyncs = uniformChoice{"v0"} // "v2"
nodeMempools = uniformChoice{"v0", "v1"}
nodeStateSyncs = uniformChoice{false, true}
nodeStateSyncs = uniformChoice{e2e.StateSyncDisabled, e2e.StateSyncP2P, e2e.StateSyncRPC}
nodePersistIntervals = uniformChoice{0, 1, 5}
nodeSnapshotIntervals = uniformChoice{0, 3}
nodeRetainBlocks = uniformChoice{0, int(e2e.EvidenceAgeHeight), int(e2e.EvidenceAgeHeight) + 5}
@@ -275,13 +275,17 @@ func generateNode(
PrivvalProtocol: nodePrivvalProtocols.Choose(r).(string),
BlockSync: nodeBlockSyncs.Choose(r).(string),
Mempool: nodeMempools.Choose(r).(string),
StateSync: nodeStateSyncs.Choose(r).(bool) && startAt > 0,
StateSync: e2e.StateSyncDisabled,
PersistInterval: ptrUint64(uint64(nodePersistIntervals.Choose(r).(int))),
SnapshotInterval: uint64(nodeSnapshotIntervals.Choose(r).(int)),
RetainBlocks: uint64(nodeRetainBlocks.Choose(r).(int)),
Perturb: nodePerturbations.Choose(r),
}
if startAt > 0 {
node.StateSync = nodeStateSyncs.Choose(r).(string)
}
// If this node is forced to be an archive node, retain all blocks and
// enable state sync snapshotting.
if forceArchive {
@@ -310,7 +314,7 @@ func generateNode(
}
}
if node.StateSync {
if node.StateSync != e2e.StateSyncDisabled {
node.BlockSync = "v0"
}

View File

@@ -55,6 +55,7 @@ retain_blocks = 7
[node.validator04]
abci_protocol = "builtin"
snapshot_interval = 5
database = "rocksdb"
persistent_peers = ["validator01"]
perturb = ["pause"]

View File

@@ -117,7 +117,8 @@ type ManifestNode struct {
// block hashes and RPC servers. At least one node in the network must have
// SnapshotInterval set to non-zero, and the state syncing node must have
// StartAt set to an appropriate height where a snapshot is available.
StateSync bool `toml:"state_sync"`
// StateSync can either be "p2p" or "rpc" or an empty string to disable
StateSync string `toml:"state_sync"`
// PersistInterval specifies the height interval at which the application
// will persist state to disk. Defaults to 1 (every height), setting this to

View File

@@ -50,6 +50,10 @@ const (
EvidenceAgeHeight int64 = 7
EvidenceAgeTime time.Duration = 500 * time.Millisecond
StateSyncP2P = "p2p"
StateSyncRPC = "rpc"
StateSyncDisabled = ""
)
// Testnet represents a single testnet.
@@ -81,7 +85,7 @@ type Node struct {
StartAt int64
BlockSync string
Mempool string
StateSync bool
StateSync string
Database string
ABCIProtocol Protocol
PrivvalProtocol Protocol
@@ -333,6 +337,11 @@ func (n Node) Validate(testnet Testnet) error {
default:
return fmt.Errorf("invalid block sync setting %q", n.BlockSync)
}
switch n.StateSync {
case StateSyncDisabled, StateSyncP2P, StateSyncRPC:
default:
return fmt.Errorf("invalid state sync setting %q", n.StateSync)
}
switch n.Mempool {
case "", "v0", "v1":
default:
@@ -366,7 +375,7 @@ func (n Node) Validate(testnet Testnet) error {
return fmt.Errorf("cannot start at height %v lower than initial height %v",
n.StartAt, n.Testnet.InitialHeight)
}
if n.StateSync && n.StartAt == 0 {
if n.StateSync != StateSyncDisabled && n.StartAt == 0 {
return errors.New("state synced nodes cannot start at the initial height")
}
if n.RetainBlocks != 0 && n.RetainBlocks < uint64(EvidenceAgeHeight) {

View File

@@ -302,23 +302,23 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
cfg.BlockSync.Version = node.BlockSync
}
if node.StateSync {
switch node.StateSync {
case e2e.StateSyncP2P:
cfg.StateSync.Enable = true
cfg.StateSync.UseP2P = true
/*
cfg.StateSync.RPCServers = []string{}
for _, peer := range node.Testnet.ArchiveNodes() {
if peer.Name == node.Name {
continue
}
cfg.StateSync.RPCServers = append(cfg.StateSync.RPCServers, peer.AddressRPC())
case e2e.StateSyncRPC:
cfg.StateSync.Enable = true
cfg.StateSync.RPCServers = []string{}
for _, peer := range node.Testnet.ArchiveNodes() {
if peer.Name == node.Name {
continue
}
cfg.StateSync.RPCServers = append(cfg.StateSync.RPCServers, peer.AddressRPC())
}
if len(cfg.StateSync.RPCServers) < 2 {
return nil, errors.New("unable to find 2 suitable state sync RPC servers")
}
*/
if len(cfg.StateSync.RPCServers) < 2 {
return nil, errors.New("unable to find 2 suitable state sync RPC servers")
}
}
cfg.P2P.Seeds = ""

View File

@@ -58,7 +58,7 @@ func Start(testnet *e2e.Testnet) error {
// Update any state sync nodes with a trusted height and hash
for _, node := range nodeQueue {
if node.StateSync || node.Mode == e2e.ModeLight {
if node.StateSync != e2e.StateSyncDisabled || node.Mode == e2e.ModeLight {
err = UpdateConfigStateSync(node, block.Height, blockID.Hash.Bytes())
if err != nil {
return err

View File

@@ -34,7 +34,7 @@ func TestBlock_Header(t *testing.T) {
}
// the first blocks after state sync come from the backfill process
// and are therefore not complete
if node.StateSync && block.Header.Height <= first+e2e.EvidenceAgeHeight+1 {
if node.StateSync != e2e.StateSyncDisabled && block.Header.Height <= first+e2e.EvidenceAgeHeight+1 {
continue
}
if block.Header.Height > last {
@@ -70,7 +70,7 @@ func TestBlock_Range(t *testing.T) {
switch {
// if the node state synced we ignore any assertions because it's hard to know how far back
// the node ran reverse sync for
case node.StateSync:
case node.StateSync != e2e.StateSyncDisabled:
break
case node.RetainBlocks > 0 && int64(node.RetainBlocks) < (last-node.Testnet.InitialHeight+1):
// Delta handles race conditions in reading first/last heights.
@@ -83,7 +83,7 @@ func TestBlock_Range(t *testing.T) {
}
for h := first; h <= last; h++ {
if node.StateSync && h <= first+e2e.EvidenceAgeHeight+1 {
if node.StateSync != e2e.StateSyncDisabled && h <= first+e2e.EvidenceAgeHeight+1 {
continue
}
resp, err := client.Block(ctx, &(h))