add params proto messages

This commit is contained in:
Callum Waters
2021-08-06 14:48:25 +02:00
parent e5f9dd2736
commit 2bde896c13
14 changed files with 930 additions and 337 deletions

View File

@@ -29,7 +29,6 @@ import (
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/libs/strings"
tmtime "github.com/tendermint/tendermint/libs/time"
"github.com/tendermint/tendermint/light"
"github.com/tendermint/tendermint/privval"
tmgrpc "github.com/tendermint/tendermint/privval/grpc"
"github.com/tendermint/tendermint/proxy"
@@ -652,6 +651,8 @@ func (n *nodeImpl) OnStart() error {
}
// Run state sync
// TODO: We shouldn't run state sync if we already have state that has a
// LastBlockHeight that is not InitialHeight
if n.stateSync {
bcR, ok := n.bcReactor.(cs.BlockSyncReactor)
if !ok {
@@ -664,17 +665,53 @@ func (n *nodeImpl) OnStart() error {
return fmt.Errorf("unable to derive state: %w", err)
}
ssc := n.config.StateSync
sp, err := constructStateProvider(ssc, state, n.Logger.With("module", "light"))
n.Logger.Info("starting state sync...")
if err != nil {
return fmt.Errorf("failed to set up light client state provider: %w", err)
// TODO: we may want to move these events within the respective
// reactors.
// at the beginning of the statesync start, we use the initialHeight as the event height
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
}
if err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, sp,
ssc, n.config.FastSyncMode, state.InitialHeight, n.eventBus); err != nil {
return fmt.Errorf("failed to start state sync: %w", err)
}
// FIXME: We shouldn't allow state sync to silently error out without
// bubbling up the error and gracefully shutting down the rest of the node
go func() {
state, err := n.stateSyncReactor.Sync(context.TODO(), state.ChainID, state.InitialHeight)
if err != nil {
n.Logger.Error("state sync failed", "err", err)
return
}
n.consensusReactor.SetStateSyncingMetrics(0)
d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
}
// TODO: Some form of orchestrator is needed here between the state
// advancing reactors to be able to control which one of the three
// is running
if n.config.FastSyncMode {
// FIXME Very ugly to have these metrics bleed through here.
n.consensusReactor.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(state); err != nil {
n.Logger.Error("failed to switch to block sync", "err", err)
return
}
d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
if err := n.eventBus.PublishEventBlockSyncStatus(d); err != nil {
n.eventBus.Logger.Error("failed to emit the block sync starting event", "err", err)
}
} else {
n.consensusReactor.SwitchToConsensus(state, true)
}
}()
}
return nil
@@ -1033,67 +1070,6 @@ func (n *nodeImpl) NodeInfo() types.NodeInfo {
return n.nodeInfo
}
// startStateSync starts an asynchronous state sync process, then switches to block sync mode.
func startStateSync(
ssR statesync.SyncReactor,
bcR cs.BlockSyncReactor,
conR cs.ConsSyncReactor,
sp statesync.StateProvider,
config *cfg.StateSyncConfig,
blockSync bool,
stateInitHeight int64,
eb *types.EventBus,
) error {
stateSyncLogger := eb.Logger.With("module", "statesync")
stateSyncLogger.Info("starting state sync...")
// at the beginning of the statesync start, we use the initialHeight as the event height
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
d := types.EventDataStateSyncStatus{Complete: false, Height: stateInitHeight}
if err := eb.PublishEventStateSyncStatus(d); err != nil {
stateSyncLogger.Error("failed to emit the statesync start event", "err", err)
}
go func() {
state, err := ssR.Sync(context.TODO(), sp, config.DiscoveryTime)
if err != nil {
stateSyncLogger.Error("state sync failed", "err", err)
return
}
if err := ssR.Backfill(state); err != nil {
stateSyncLogger.Error("backfill failed; node has insufficient history to verify all evidence;"+
" proceeding optimistically...", "err", err)
}
conR.SetStateSyncingMetrics(0)
d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
if err := eb.PublishEventStateSyncStatus(d); err != nil {
stateSyncLogger.Error("failed to emit the statesync start event", "err", err)
}
if blockSync {
// FIXME Very ugly to have these metrics bleed through here.
conR.SetBlockSyncingMetrics(1)
if err := bcR.SwitchToBlockSync(state); err != nil {
stateSyncLogger.Error("failed to switch to block sync", "err", err)
return
}
d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
if err := eb.PublishEventBlockSyncStatus(d); err != nil {
stateSyncLogger.Error("failed to emit the block sync starting event", "err", err)
}
} else {
conR.SwitchToConsensus(state, true)
}
}()
return nil
}
// genesisDocProvider returns a GenesisDoc.
// It allows the GenesisDoc to be pulled from sources other than the
// filesystem, for instance from a distributed key-value store cluster.
@@ -1276,24 +1252,3 @@ func getChannelsFromShim(reactorShim *p2p.ReactorShim) map[p2p.ChannelID]*p2p.Ch
return channels
}
func constructStateProvider(
ssc *cfg.StateSyncConfig,
state sm.State,
logger log.Logger,
) (statesync.StateProvider, error) {
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
defer cancel()
to := light.TrustOptions{
Period: ssc.TrustPeriod,
Height: ssc.TrustHeight,
Hash: ssc.TrustHashBytes(),
}
return statesync.NewLightClientStateProvider(
ctx,
state.ChainID, state.Version, state.InitialHeight,
ssc.RPCServers, to, logger,
)
}