mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-10 05:50:19 +00:00
Compare commits
20 Commits
wb/remove-
...
callum/p2p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f47414f0a8 | ||
|
|
e61145111b | ||
|
|
4c7f416c3f | ||
|
|
8955915de4 | ||
|
|
a62c886a86 | ||
|
|
31a9c5f667 | ||
|
|
5baf7a7f0c | ||
|
|
0e45a1d0eb | ||
|
|
87f241fbe7 | ||
|
|
c7cfaf9a1b | ||
|
|
25aa63ab9f | ||
|
|
ffe02e5209 | ||
|
|
18982cd831 | ||
|
|
d71edfc960 | ||
|
|
d44d281d80 | ||
|
|
45b5349317 | ||
|
|
aeb6058210 | ||
|
|
f0ccd0fcd1 | ||
|
|
39c54f5f34 | ||
|
|
2bde896c13 |
@@ -151,6 +151,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- [state/privval] \#6578 No GetPubKey retry beyond the proposal/voting window (@JayT106)
|
||||
- [rpc] \#6615 Add TotalGasUsed to block_results response (@crypto-facs)
|
||||
- [cmd/tendermint/commands] \#6623 replace `$HOME/.some/test/dir` with `t.TempDir` (@tanyabouman)
|
||||
- [statesync] \6807 Implement P2P state provider as an alternative to RPC (@cmwaters)
|
||||
|
||||
### BUG FIXES
|
||||
|
||||
|
||||
@@ -884,15 +884,28 @@ func (cfg *MempoolConfig) ValidateBasic() error {
|
||||
|
||||
// StateSyncConfig defines the configuration for the Tendermint state sync service
|
||||
type StateSyncConfig struct {
|
||||
Enable bool `mapstructure:"enable"`
|
||||
TempDir string `mapstructure:"temp-dir"`
|
||||
RPCServers []string `mapstructure:"rpc-servers"`
|
||||
TrustPeriod time.Duration `mapstructure:"trust-period"`
|
||||
TrustHeight int64 `mapstructure:"trust-height"`
|
||||
TrustHash string `mapstructure:"trust-hash"`
|
||||
Enable bool `mapstructure:"enable"`
|
||||
TempDir string `mapstructure:"temp-dir"`
|
||||
|
||||
// Light blocks needed for state verification can be obtained either via
|
||||
// the P2P layer or RPC layer. Default is RPC
|
||||
UseP2P bool `mapstructure:"use-p2p"`
|
||||
|
||||
// If using RPC, at least two endpoints need to be provided
|
||||
RPCServers []string `mapstructure:"rpc-servers"`
|
||||
|
||||
// Source of trust. State sync uses a light client for verification so
|
||||
// some trusted source must be provided
|
||||
TrustPeriod time.Duration `mapstructure:"trust-period"`
|
||||
TrustHeight int64 `mapstructure:"trust-height"`
|
||||
TrustHash string `mapstructure:"trust-hash"`
|
||||
|
||||
DiscoveryTime time.Duration `mapstructure:"discovery-time"`
|
||||
ChunkRequestTimeout time.Duration `mapstructure:"chunk-request-timeout"`
|
||||
Fetchers int32 `mapstructure:"fetchers"`
|
||||
|
||||
// Fetchers defines the amount of parallel workers to fetch statesync chunks
|
||||
// and light blocks
|
||||
Fetchers int32 `mapstructure:"fetchers"`
|
||||
}
|
||||
|
||||
func (cfg *StateSyncConfig) TrustHashBytes() []byte {
|
||||
@@ -922,17 +935,21 @@ func TestStateSyncConfig() *StateSyncConfig {
|
||||
// ValidateBasic performs basic validation.
|
||||
func (cfg *StateSyncConfig) ValidateBasic() error {
|
||||
if cfg.Enable {
|
||||
if len(cfg.RPCServers) == 0 {
|
||||
return errors.New("rpc-servers is required")
|
||||
}
|
||||
// If we're not using the P2P stack then we need to validate the
|
||||
// RPCServers
|
||||
if !cfg.UseP2P {
|
||||
if len(cfg.RPCServers) == 0 {
|
||||
return errors.New("rpc-servers is required")
|
||||
}
|
||||
|
||||
if len(cfg.RPCServers) < 2 {
|
||||
return errors.New("at least two rpc-servers entries is required")
|
||||
}
|
||||
if len(cfg.RPCServers) < 2 {
|
||||
return errors.New("at least two rpc-servers entries is required")
|
||||
}
|
||||
|
||||
for _, server := range cfg.RPCServers {
|
||||
if len(server) == 0 {
|
||||
return errors.New("found empty rpc-servers entry")
|
||||
for _, server := range cfg.RPCServers {
|
||||
if len(server) == 0 {
|
||||
return errors.New("found empty rpc-servers entry")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -426,15 +426,17 @@ ttl-num-blocks = {{ .Mempool.TTLNumBlocks }}
|
||||
# starting from the height of the snapshot.
|
||||
enable = {{ .StateSync.Enable }}
|
||||
|
||||
# RPC servers (comma-separated) for light client verification of the synced state machine and
|
||||
# retrieval of state data for node bootstrapping. Also needs a trusted height and corresponding
|
||||
# header hash obtained from a trusted source, and a period during which validators can be trusted.
|
||||
#
|
||||
# For Cosmos SDK-based chains, trust-period should usually be about 2/3 of the unbonding time (~2
|
||||
# weeks) during which they can be financially punished (slashed) for misbehavior.
|
||||
# State sync can source light blocks needed to verify state either through the P2P layer or via RPC
|
||||
# When using the RPC, at least two (comma-separated) server addresses must be specified
|
||||
use-p2p = {{ .StateSync.UseP2P }}
|
||||
rpc-servers = "{{ StringsJoin .StateSync.RPCServers "," }}"
|
||||
|
||||
# The hash and height of a trusted block. Must be within the trust-period.
|
||||
trust-height = {{ .StateSync.TrustHeight }}
|
||||
trust-hash = "{{ .StateSync.TrustHash }}"
|
||||
|
||||
# For Cosmos SDK-based chains, trust-period should usually be about 2/3 of the unbonding time (~2
|
||||
# weeks) during which they can be financially punished (slashed) for misbehavior.
|
||||
trust-period = "{{ .StateSync.TrustPeriod }}"
|
||||
|
||||
# Time to spend discovering snapshots before initiating a restore.
|
||||
|
||||
@@ -36,8 +36,7 @@ func TestPubKeySecp256k1Address(t *testing.T) {
|
||||
addrBbz, _, _ := base58.CheckDecode(d.addr)
|
||||
addrB := crypto.Address(addrBbz)
|
||||
|
||||
var priv secp256k1.PrivKey = secp256k1.PrivKey(privB)
|
||||
|
||||
priv := secp256k1.PrivKey(privB)
|
||||
pubKey := priv.PubKey()
|
||||
pubT, _ := pubKey.(secp256k1.PubKey)
|
||||
pub := pubT
|
||||
|
||||
@@ -1096,7 +1096,7 @@ func (r *Reactor) handleDataMessage(envelope p2p.Envelope, msgI Message) error {
|
||||
}
|
||||
|
||||
if r.WaitSync() {
|
||||
logger.Info("ignoring message received during sync", "msg", msgI)
|
||||
logger.Info("ignoring message received during sync", "msg", fmt.Sprintf("%T", msgI))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -608,7 +608,7 @@ func prefixToBytes(prefix int64) []byte {
|
||||
}
|
||||
|
||||
func keyCommitted(evidence types.Evidence) []byte {
|
||||
var height int64 = evidence.Height()
|
||||
var height = evidence.Height()
|
||||
key, err := orderedcode.Append(nil, prefixCommitted, height, string(evidence.Hash()))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@@ -617,7 +617,7 @@ func keyCommitted(evidence types.Evidence) []byte {
|
||||
}
|
||||
|
||||
func keyPending(evidence types.Evidence) []byte {
|
||||
var height int64 = evidence.Height()
|
||||
var height = evidence.Height()
|
||||
key, err := orderedcode.Append(nil, prefixPending, height, string(evidence.Hash()))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -274,8 +274,10 @@ loop:
|
||||
}
|
||||
|
||||
func mockLBResp(t *testing.T, peer types.NodeID, height int64, time time.Time) lightBlockResponse {
|
||||
vals, pv := factory.RandValidatorSet(3, 10)
|
||||
_, _, lb := mockLB(t, height, time, factory.MakeBlockID(), vals, pv)
|
||||
return lightBlockResponse{
|
||||
block: mockLB(t, height, time, factory.MakeBlockID()),
|
||||
block: lb,
|
||||
peer: peer,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,156 +19,69 @@ var (
|
||||
errUnsolicitedResponse = errors.New("unsolicited light block response")
|
||||
errNoResponse = errors.New("peer failed to respond within timeout")
|
||||
errPeerAlreadyBusy = errors.New("peer is already processing a request")
|
||||
errDisconnected = errors.New("dispatcher has been disconnected")
|
||||
errDisconnected = errors.New("provider has been disconnected")
|
||||
)
|
||||
|
||||
// dispatcher keeps a list of peers and allows concurrent requests for light
|
||||
// blocks. NOTE: It is not the responsibility of the dispatcher to verify the
|
||||
// light blocks.
|
||||
type dispatcher struct {
|
||||
availablePeers *peerlist
|
||||
requestCh chan<- p2p.Envelope
|
||||
timeout time.Duration
|
||||
// dispatcher multiplexes concurrent requests by multiple peers for light blocks.
|
||||
// Only one request per peer can be sent at a time
|
||||
// NOTE: It is not the responsibility of the dispatcher to verify the light blocks.
|
||||
type Dispatcher struct {
|
||||
// the channel with which to send light block requests on
|
||||
requestCh chan<- p2p.Envelope
|
||||
// timeout for light block delivery (immutable)
|
||||
timeout time.Duration
|
||||
|
||||
mtx sync.Mutex
|
||||
calls map[types.NodeID]chan *types.LightBlock
|
||||
mtx sync.Mutex
|
||||
// all pending calls that have been dispatched and are awaiting an answer
|
||||
calls map[types.NodeID]chan *types.LightBlock
|
||||
// signals whether the underlying reactor is still running
|
||||
running bool
|
||||
}
|
||||
|
||||
func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispatcher {
|
||||
return &dispatcher{
|
||||
availablePeers: newPeerList(),
|
||||
timeout: timeout,
|
||||
requestCh: requestCh,
|
||||
calls: make(map[types.NodeID]chan *types.LightBlock),
|
||||
running: true,
|
||||
func NewDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *Dispatcher {
|
||||
return &Dispatcher{
|
||||
timeout: timeout,
|
||||
requestCh: requestCh,
|
||||
calls: make(map[types.NodeID]chan *types.LightBlock),
|
||||
running: true,
|
||||
}
|
||||
}
|
||||
|
||||
// LightBlock uses the request channel to fetch a light block from the next peer
|
||||
// in a list, tracks the call and waits for the reactor to pass along the response
|
||||
func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.LightBlock, types.NodeID, error) {
|
||||
d.mtx.Lock()
|
||||
// check to see that the dispatcher is connected to at least one peer
|
||||
if d.availablePeers.Len() == 0 && len(d.calls) == 0 {
|
||||
d.mtx.Unlock()
|
||||
return nil, "", errNoConnectedPeers
|
||||
}
|
||||
d.mtx.Unlock()
|
||||
|
||||
// fetch the next peer id in the list and request a light block from that
|
||||
// peer
|
||||
peer := d.availablePeers.Pop(ctx)
|
||||
lb, err := d.lightBlock(ctx, height, peer)
|
||||
return lb, peer, err
|
||||
}
|
||||
|
||||
// Providers turns the dispatcher into a set of providers (per peer) which can
|
||||
// be used by a light client
|
||||
func (d *dispatcher) Providers(chainID string, timeout time.Duration) []provider.Provider {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
|
||||
providers := make([]provider.Provider, d.availablePeers.Len())
|
||||
peers := d.availablePeers.Peers()
|
||||
for index, peer := range peers {
|
||||
providers[index] = &blockProvider{
|
||||
peer: peer,
|
||||
dispatcher: d,
|
||||
chainID: chainID,
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
return providers
|
||||
}
|
||||
|
||||
func (d *dispatcher) stop() {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
d.running = false
|
||||
for peer, call := range d.calls {
|
||||
close(call)
|
||||
delete(d.calls, peer)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dispatcher) start() {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
d.running = true
|
||||
}
|
||||
|
||||
func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) {
|
||||
func (d *Dispatcher) LightBlock(ctx context.Context, height int64, peer types.NodeID) (*types.LightBlock, error) {
|
||||
// dispatch the request to the peer
|
||||
callCh, err := d.dispatch(peer, height)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// clean up the call after a response is returned
|
||||
defer func() {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
if call, ok := d.calls[peer]; ok {
|
||||
delete(d.calls, peer)
|
||||
close(call)
|
||||
}
|
||||
}()
|
||||
|
||||
// wait for a response, cancel or timeout
|
||||
select {
|
||||
case resp := <-callCh:
|
||||
return resp, nil
|
||||
|
||||
case <-ctx.Done():
|
||||
d.release(peer)
|
||||
return nil, nil
|
||||
return nil, ctx.Err()
|
||||
|
||||
case <-time.After(d.timeout):
|
||||
d.release(peer)
|
||||
return nil, errNoResponse
|
||||
}
|
||||
}
|
||||
|
||||
// respond allows the underlying process which receives requests on the
|
||||
// requestCh to respond with the respective light block
|
||||
func (d *dispatcher) respond(lb *proto.LightBlock, peer types.NodeID) error {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
|
||||
// check that the response came from a request
|
||||
answerCh, ok := d.calls[peer]
|
||||
if !ok {
|
||||
// this can also happen if the response came in after the timeout
|
||||
return errUnsolicitedResponse
|
||||
}
|
||||
// release the peer after returning the response
|
||||
defer d.availablePeers.Append(peer)
|
||||
defer close(answerCh)
|
||||
defer delete(d.calls, peer)
|
||||
|
||||
if lb == nil {
|
||||
answerCh <- nil
|
||||
return nil
|
||||
}
|
||||
|
||||
block, err := types.LightBlockFromProto(lb)
|
||||
if err != nil {
|
||||
fmt.Println("error with converting light block")
|
||||
return err
|
||||
}
|
||||
|
||||
answerCh <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dispatcher) addPeer(peer types.NodeID) {
|
||||
d.availablePeers.Append(peer)
|
||||
}
|
||||
|
||||
func (d *dispatcher) removePeer(peer types.NodeID) {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
if _, ok := d.calls[peer]; ok {
|
||||
delete(d.calls, peer)
|
||||
} else {
|
||||
d.availablePeers.Remove(peer)
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch takes a peer and allocates it a channel so long as it's not already
|
||||
// busy and the receiving channel is still running. It then dispatches the message
|
||||
func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.LightBlock, error) {
|
||||
func (d *Dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.LightBlock, error) {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
ch := make(chan *types.LightBlock, 1)
|
||||
@@ -179,7 +92,7 @@ func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh
|
||||
return ch, errDisconnected
|
||||
}
|
||||
|
||||
// this should happen only if we add the same peer twice (somehow)
|
||||
// check if a request for the same peer has already been made
|
||||
if _, ok := d.calls[peer]; ok {
|
||||
close(ch)
|
||||
return ch, errPeerAlreadyBusy
|
||||
@@ -193,47 +106,100 @@ func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh
|
||||
Height: uint64(height),
|
||||
},
|
||||
}
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// release appends the peer back to the list and deletes the allocated call so
|
||||
// that a new call can be made to that peer
|
||||
func (d *dispatcher) release(peer types.NodeID) {
|
||||
// respond allows the underlying process which receives requests on the
|
||||
// requestCh to respond with the respective light block
|
||||
func (d *Dispatcher) Respond(lb *proto.LightBlock, peer types.NodeID) error {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
if call, ok := d.calls[peer]; ok {
|
||||
close(call)
|
||||
delete(d.calls, peer)
|
||||
|
||||
// check that the response came from a request
|
||||
answerCh, ok := d.calls[peer]
|
||||
if !ok {
|
||||
// this can also happen if the response came in after the timeout
|
||||
return errUnsolicitedResponse
|
||||
}
|
||||
|
||||
if lb == nil {
|
||||
answerCh <- nil
|
||||
return nil
|
||||
}
|
||||
|
||||
block, err := types.LightBlockFromProto(lb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
answerCh <- block
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Dispatcher) Stop() {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
d.running = false
|
||||
for peer, call := range d.calls {
|
||||
delete(d.calls, peer)
|
||||
close(call)
|
||||
}
|
||||
d.availablePeers.Append(peer)
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------
|
||||
|
||||
// blockProvider is a p2p based light provider which uses a dispatcher connected
|
||||
// BlockProvider is a p2p based light provider which uses a dispatcher connected
|
||||
// to the state sync reactor to serve light blocks to the light client
|
||||
//
|
||||
// TODO: This should probably be moved over to the light package but as we're
|
||||
// not yet officially supporting p2p light clients we'll leave this here for now.
|
||||
type blockProvider struct {
|
||||
//
|
||||
// NOTE: BlockProvider will return an error with concurrent calls. However, we don't
|
||||
// need a mutex because a light client (and the backfill process) will never call a
|
||||
// method more than once at the same time
|
||||
type BlockProvider struct {
|
||||
peer types.NodeID
|
||||
chainID string
|
||||
timeout time.Duration
|
||||
dispatcher *dispatcher
|
||||
dispatcher *Dispatcher
|
||||
}
|
||||
|
||||
func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) {
|
||||
// FIXME: The provider doesn't know if the dispatcher is still connected to
|
||||
// that peer. If the connection is dropped for whatever reason the
|
||||
// dispatcher needs to be able to relay this back to the provider so it can
|
||||
// return ErrConnectionClosed instead of ErrNoResponse
|
||||
ctx, cancel := context.WithTimeout(ctx, p.timeout)
|
||||
defer cancel()
|
||||
lb, _ := p.dispatcher.lightBlock(ctx, height, p.peer)
|
||||
if lb == nil {
|
||||
// Creates a block provider which implements the light client Provider interface.
|
||||
func NewBlockProvider(peer types.NodeID, chainID string, dispatcher *Dispatcher) *BlockProvider {
|
||||
return &BlockProvider{
|
||||
peer: peer,
|
||||
chainID: chainID,
|
||||
dispatcher: dispatcher,
|
||||
}
|
||||
}
|
||||
|
||||
// LightBlock fetches a light block from the peer at a specified height returning either a light block
|
||||
// or an appropriate error. Concurrently unsafe
|
||||
func (p *BlockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) {
|
||||
lb, err := p.dispatcher.LightBlock(ctx, height, p.peer)
|
||||
switch err {
|
||||
case nil:
|
||||
if lb == nil {
|
||||
return nil, provider.ErrLightBlockNotFound
|
||||
}
|
||||
case context.DeadlineExceeded, context.Canceled:
|
||||
return nil, err
|
||||
case errPeerAlreadyBusy:
|
||||
return nil, provider.ErrLightBlockNotFound
|
||||
case errNoResponse:
|
||||
return nil, provider.ErrNoResponse
|
||||
default: // errDisconnected
|
||||
return nil, provider.ErrUnreliableProvider{Reason: err.Error()}
|
||||
}
|
||||
|
||||
// check that the height requested is the same one returned
|
||||
if lb.Height != height {
|
||||
return nil, provider.ErrBadLightBlock{
|
||||
Reason: fmt.Errorf("expected height %d, got height %d", height, lb.Height),
|
||||
}
|
||||
}
|
||||
|
||||
// perform basic validation
|
||||
if err := lb.ValidateBasic(p.chainID); err != nil {
|
||||
return nil, provider.ErrBadLightBlock{Reason: err}
|
||||
}
|
||||
@@ -245,37 +211,37 @@ func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.Li
|
||||
// attacks. This is a no op as there currently isn't a way to wire this up to
|
||||
// the evidence reactor (we should endeavor to do this in the future but for now
|
||||
// it's not critical for backwards verification)
|
||||
func (p *blockProvider) ReportEvidence(ctx context.Context, ev types.Evidence) error {
|
||||
func (p *BlockProvider) ReportEvidence(ctx context.Context, ev types.Evidence) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// String implements stringer interface
|
||||
func (p *blockProvider) String() string { return string(p.peer) }
|
||||
func (p *BlockProvider) String() string { return string(p.peer) }
|
||||
|
||||
//----------------------------------------------------------------
|
||||
|
||||
// peerList is a rolling list of peers. This is used to distribute the load of
|
||||
// retrieving blocks over all the peers the reactor is connected to
|
||||
type peerlist struct {
|
||||
type peerList struct {
|
||||
mtx sync.Mutex
|
||||
peers []types.NodeID
|
||||
waiting []chan types.NodeID
|
||||
}
|
||||
|
||||
func newPeerList() *peerlist {
|
||||
return &peerlist{
|
||||
func newPeerList() *peerList {
|
||||
return &peerList{
|
||||
peers: make([]types.NodeID, 0),
|
||||
waiting: make([]chan types.NodeID, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *peerlist) Len() int {
|
||||
func (l *peerList) Len() int {
|
||||
l.mtx.Lock()
|
||||
defer l.mtx.Unlock()
|
||||
return len(l.peers)
|
||||
}
|
||||
|
||||
func (l *peerlist) Pop(ctx context.Context) types.NodeID {
|
||||
func (l *peerList) Pop(ctx context.Context) types.NodeID {
|
||||
l.mtx.Lock()
|
||||
if len(l.peers) == 0 {
|
||||
// if we don't have any peers in the list we block until a peer is
|
||||
@@ -299,7 +265,7 @@ func (l *peerlist) Pop(ctx context.Context) types.NodeID {
|
||||
return peer
|
||||
}
|
||||
|
||||
func (l *peerlist) Append(peer types.NodeID) {
|
||||
func (l *peerList) Append(peer types.NodeID) {
|
||||
l.mtx.Lock()
|
||||
defer l.mtx.Unlock()
|
||||
if len(l.waiting) > 0 {
|
||||
@@ -312,7 +278,7 @@ func (l *peerlist) Append(peer types.NodeID) {
|
||||
}
|
||||
}
|
||||
|
||||
func (l *peerlist) Remove(peer types.NodeID) {
|
||||
func (l *peerList) Remove(peer types.NodeID) {
|
||||
l.mtx.Lock()
|
||||
defer l.mtx.Unlock()
|
||||
for i, p := range l.peers {
|
||||
@@ -323,7 +289,7 @@ func (l *peerlist) Remove(peer types.NodeID) {
|
||||
}
|
||||
}
|
||||
|
||||
func (l *peerlist) Peers() []types.NodeID {
|
||||
func (l *peerList) All() []types.NodeID {
|
||||
l.mtx.Lock()
|
||||
defer l.mtx.Unlock()
|
||||
return l.peers
|
||||
|
||||
@@ -17,144 +17,126 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
peer, _ = types.NewNodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
|
||||
)
|
||||
|
||||
func TestDispatcherBasic(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
numPeers := 5
|
||||
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
closeCh := make(chan struct{})
|
||||
defer close(closeCh)
|
||||
|
||||
d := newDispatcher(ch, 1*time.Second)
|
||||
|
||||
d := NewDispatcher(ch, 1*time.Second)
|
||||
go handleRequests(t, d, ch, closeCh)
|
||||
|
||||
peers := createPeerSet(5)
|
||||
for _, peer := range peers {
|
||||
d.addPeer(peer)
|
||||
}
|
||||
|
||||
peers := createPeerSet(numPeers)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
// make a bunch of async requests and require that the correct responses are
|
||||
// given
|
||||
for i := 1; i < 10; i++ {
|
||||
for i := 0; i < numPeers; i++ {
|
||||
wg.Add(1)
|
||||
go func(height int64) {
|
||||
defer wg.Done()
|
||||
lb, peer, err := d.LightBlock(context.Background(), height)
|
||||
lb, err := d.LightBlock(context.Background(), height, peers[height-1])
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, lb)
|
||||
require.Equal(t, lb.Height, height)
|
||||
require.Contains(t, peers, peer)
|
||||
}(int64(i))
|
||||
}(int64(i + 1))
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// assert that all calls were responded to
|
||||
assert.Empty(t, d.calls)
|
||||
}
|
||||
|
||||
func TestDispatcherReturnsNoBlock(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
d := newDispatcher(ch, 1*time.Second)
|
||||
peerFromSet := createPeerSet(1)[0]
|
||||
d.addPeer(peerFromSet)
|
||||
d := NewDispatcher(ch, 1*time.Second)
|
||||
doneCh := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
<-ch
|
||||
require.NoError(t, d.respond(nil, peerFromSet))
|
||||
require.NoError(t, d.Respond(nil, peer))
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
lb, peerResult, err := d.LightBlock(context.Background(), 1)
|
||||
lb, err := d.LightBlock(context.Background(), 1, peer)
|
||||
<-doneCh
|
||||
|
||||
require.Nil(t, lb)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, peerFromSet, peerResult)
|
||||
}
|
||||
|
||||
func TestDispatcherErrorsWhenNoPeers(t *testing.T) {
|
||||
func TestDispatcherTimeOutWaitingOnLightBlock(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
d := newDispatcher(ch, 1*time.Second)
|
||||
d := NewDispatcher(ch, 1*time.Second)
|
||||
|
||||
lb, peerResult, err := d.LightBlock(context.Background(), 1)
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
defer cancelFunc()
|
||||
|
||||
lb, err := d.LightBlock(ctx, 1, peer)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Equal(t, context.DeadlineExceeded, err)
|
||||
require.Nil(t, lb)
|
||||
require.Empty(t, peerResult)
|
||||
require.Equal(t, errNoConnectedPeers, err)
|
||||
}
|
||||
|
||||
func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) {
|
||||
func TestDispatcherTimeOutWaitingOnLightBlock2(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
dispatcherRequestCh := make(chan p2p.Envelope, 100)
|
||||
d := newDispatcher(dispatcherRequestCh, 1*time.Second)
|
||||
peerFromSet := createPeerSet(1)[0]
|
||||
d.addPeer(peerFromSet)
|
||||
ctx := context.Background()
|
||||
wrapped, cancelFunc := context.WithCancel(ctx)
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
d := NewDispatcher(ch, 10*time.Millisecond)
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
lb, peerResult, err := d.LightBlock(wrapped, 1)
|
||||
require.Nil(t, lb)
|
||||
require.Equal(t, peerFromSet, peerResult)
|
||||
require.Nil(t, err)
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancelFunc()
|
||||
|
||||
// calls to dispatcher.Lightblock write into the dispatcher's requestCh.
|
||||
// we read from the requestCh here to unblock the requestCh for future
|
||||
// calls.
|
||||
<-dispatcherRequestCh
|
||||
close(doneCh)
|
||||
}()
|
||||
cancelFunc()
|
||||
<-doneCh
|
||||
lb, err := d.LightBlock(ctx, 1, peer)
|
||||
|
||||
go func() {
|
||||
<-dispatcherRequestCh
|
||||
lb := &types.LightBlock{}
|
||||
asProto, err := lb.ToProto()
|
||||
require.Nil(t, err)
|
||||
err = d.respond(asProto, peerFromSet)
|
||||
require.Nil(t, err)
|
||||
}()
|
||||
|
||||
lb, peerResult, err := d.LightBlock(context.Background(), 1)
|
||||
|
||||
require.NotNil(t, lb)
|
||||
require.Equal(t, peerFromSet, peerResult)
|
||||
require.Nil(t, err)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, errNoResponse, err)
|
||||
require.Nil(t, lb)
|
||||
}
|
||||
|
||||
func TestDispatcherProviders(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
chainID := "state-sync-test"
|
||||
chainID := "test-chain"
|
||||
closeCh := make(chan struct{})
|
||||
defer close(closeCh)
|
||||
|
||||
d := newDispatcher(ch, 1*time.Second)
|
||||
|
||||
d := NewDispatcher(ch, 5*time.Second)
|
||||
go handleRequests(t, d, ch, closeCh)
|
||||
|
||||
peers := createPeerSet(5)
|
||||
for _, peer := range peers {
|
||||
d.addPeer(peer)
|
||||
providers := make([]*BlockProvider, len(peers))
|
||||
for idx, peer := range peers {
|
||||
providers[idx] = NewBlockProvider(peer, chainID, d)
|
||||
}
|
||||
|
||||
providers := d.Providers(chainID, 5*time.Second)
|
||||
require.Len(t, providers, 5)
|
||||
|
||||
for i, p := range providers {
|
||||
bp, ok := p.(*blockProvider)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, bp.String(), string(peers[i]))
|
||||
assert.Equal(t, string(peers[i]), p.String(), i)
|
||||
lb, err := p.LightBlock(context.Background(), 10)
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, lb)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, lb)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcherStopped(t *testing.T) {
|
||||
ch := make(chan p2p.Envelope, 100)
|
||||
d := NewDispatcher(ch, 5*time.Second)
|
||||
d.Stop()
|
||||
|
||||
_, err := d.LightBlock(context.Background(), 1, peer)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestPeerListBasic(t *testing.T) {
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
peerList := newPeerList()
|
||||
@@ -166,7 +148,7 @@ func TestPeerListBasic(t *testing.T) {
|
||||
peerList.Append(peer)
|
||||
}
|
||||
|
||||
for idx, peer := range peerList.Peers() {
|
||||
for idx, peer := range peerList.All() {
|
||||
assert.Equal(t, peer, peerSet[idx])
|
||||
}
|
||||
|
||||
@@ -178,13 +160,22 @@ func TestPeerListBasic(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, half, peerList.Len())
|
||||
|
||||
// removing a peer that doesn't exist should not change the list
|
||||
peerList.Remove(types.NodeID("lp"))
|
||||
assert.Equal(t, half, peerList.Len())
|
||||
|
||||
// removing a peer that exists should decrease the list size by one
|
||||
peerList.Remove(peerSet[half])
|
||||
half++
|
||||
assert.Equal(t, peerSet[half], peerList.Pop(ctx))
|
||||
assert.Equal(t, numPeers-half-1, peerList.Len())
|
||||
|
||||
// popping the next peer should work as expected
|
||||
assert.Equal(t, peerSet[half+1], peerList.Pop(ctx))
|
||||
assert.Equal(t, numPeers-half-2, peerList.Len())
|
||||
|
||||
// append the two peers back
|
||||
peerList.Append(peerSet[half])
|
||||
peerList.Append(peerSet[half+1])
|
||||
assert.Equal(t, half, peerList.Len())
|
||||
}
|
||||
|
||||
func TestPeerListBlocksWhenEmpty(t *testing.T) {
|
||||
@@ -277,9 +268,28 @@ func TestPeerListConcurrent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeerListRemove(t *testing.T) {
|
||||
peerList := newPeerList()
|
||||
numPeers := 10
|
||||
|
||||
peerSet := createPeerSet(numPeers)
|
||||
for _, peer := range peerSet {
|
||||
peerList.Append(peer)
|
||||
}
|
||||
|
||||
for _, peer := range peerSet {
|
||||
peerList.Remove(peer)
|
||||
for _, p := range peerList.All() {
|
||||
require.NotEqual(t, p, peer)
|
||||
}
|
||||
numPeers--
|
||||
require.Equal(t, numPeers, peerList.Len())
|
||||
}
|
||||
}
|
||||
|
||||
// handleRequests is a helper function usually run in a separate go routine to
|
||||
// imitate the expected responses of the reactor wired to the dispatcher
|
||||
func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh chan struct{}) {
|
||||
func handleRequests(t *testing.T, d *Dispatcher, ch chan p2p.Envelope, closeCh chan struct{}) {
|
||||
t.Helper()
|
||||
for {
|
||||
select {
|
||||
@@ -288,8 +298,9 @@ func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh c
|
||||
peer := request.To
|
||||
resp := mockLBResp(t, peer, int64(height), time.Now())
|
||||
block, _ := resp.block.ToProto()
|
||||
require.NoError(t, d.respond(block, resp.peer))
|
||||
require.NoError(t, d.Respond(block, resp.peer))
|
||||
case <-closeCh:
|
||||
d.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
state "github.com/tendermint/tendermint/state"
|
||||
)
|
||||
|
||||
// MockSyncReactor is an autogenerated mock type for the SyncReactor type.
|
||||
// Because of the stateprovider uses in Sync(), we use package statesync instead of mocks.
|
||||
type MockSyncReactor struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// Backfill provides a mock function with given fields: _a0
|
||||
func (_m *MockSyncReactor) Backfill(_a0 state.State) error {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(state.State) error); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// Sync provides a mock function with given fields: _a0, _a1, _a2
|
||||
func (_m *MockSyncReactor) Sync(_a0 context.Context, _a1 StateProvider, _a2 time.Duration) (state.State, error) {
|
||||
ret := _m.Called(_a0, _a1, _a2)
|
||||
|
||||
var r0 state.State
|
||||
if rf, ok := ret.Get(0).(func(context.Context, StateProvider, time.Duration) state.State); ok {
|
||||
r0 = rf(_a0, _a1, _a2)
|
||||
} else {
|
||||
r0 = ret.Get(0).(state.State)
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, StateProvider, time.Duration) error); ok {
|
||||
r1 = rf(_a0, _a1, _a2)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
@@ -16,6 +16,8 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
"github.com/tendermint/tendermint/light/provider"
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
@@ -61,13 +63,24 @@ var (
|
||||
MsgType: new(ssproto.Message),
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(LightBlockChannel),
|
||||
Priority: 2,
|
||||
Priority: 5,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: lightBlockMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
MaxSendBytes: 400,
|
||||
},
|
||||
},
|
||||
ParamsChannel: {
|
||||
MsgType: new(ssproto.Message),
|
||||
Descriptor: &p2p.ChannelDescriptor{
|
||||
ID: byte(ParamsChannel),
|
||||
Priority: 2,
|
||||
SendQueueCapacity: 10,
|
||||
RecvMessageCapacity: paramMsgSize,
|
||||
RecvBufferCapacity: 128,
|
||||
MaxSendBytes: 400,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
@@ -81,6 +94,9 @@ const (
|
||||
// LightBlockChannel exchanges light blocks
|
||||
LightBlockChannel = p2p.ChannelID(0x62)
|
||||
|
||||
// ParamsChannel exchanges consensus params
|
||||
ParamsChannel = p2p.ChannelID(0x63)
|
||||
|
||||
// recentSnapshots is the number of recent snapshots to send and receive per peer.
|
||||
recentSnapshots = 10
|
||||
|
||||
@@ -91,31 +107,34 @@ const (
|
||||
chunkMsgSize = int(16e6) // ~16MB
|
||||
|
||||
// lightBlockMsgSize is the maximum size of a lightBlockResponseMessage
|
||||
lightBlockMsgSize = int(1e7) // ~10MB
|
||||
lightBlockMsgSize = int(1e8) // ~10MB
|
||||
|
||||
// paramMsgSize is the maximum size of a paramsResponseMessage
|
||||
paramMsgSize = int(1e5) // ~100kb
|
||||
|
||||
// lightBlockResponseTimeout is how long the dispatcher waits for a peer to
|
||||
// return a light block
|
||||
lightBlockResponseTimeout = 30 * time.Second
|
||||
lightBlockResponseTimeout = 10 * time.Second
|
||||
|
||||
// consensusParamsResponseTimeout is the time the p2p state provider waits
|
||||
// before performing a secondary call
|
||||
consensusParamsResponseTimeout = 5 * time.Second
|
||||
|
||||
// maxLightBlockRequestRetries is the amount of retries acceptable before
|
||||
// the backfill process aborts
|
||||
maxLightBlockRequestRetries = 20
|
||||
)
|
||||
|
||||
// SyncReactor defines an interface used for testing abilities of node.startStateSync.
|
||||
type SyncReactor interface {
|
||||
Sync(context.Context, StateProvider, time.Duration) (sm.State, error)
|
||||
Backfill(sm.State) error
|
||||
}
|
||||
|
||||
// Reactor handles state sync, both restoring snapshots for the local node and
|
||||
// serving snapshots for other nodes.
|
||||
type Reactor struct {
|
||||
service.BaseService
|
||||
|
||||
cfg config.StateSyncConfig
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore
|
||||
chainID string
|
||||
initialHeight int64
|
||||
cfg config.StateSyncConfig
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore
|
||||
|
||||
conn proxy.AppConnSnapshot
|
||||
connQuery proxy.AppConnQuery
|
||||
@@ -123,15 +142,22 @@ type Reactor struct {
|
||||
snapshotCh *p2p.Channel
|
||||
chunkCh *p2p.Channel
|
||||
blockCh *p2p.Channel
|
||||
paramsCh *p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
closeCh chan struct{}
|
||||
|
||||
dispatcher *dispatcher
|
||||
// Dispatcher is used to mutex light block requests and responses over multiple
|
||||
// block providers used by the p2p state provider and in reverse sync.
|
||||
dispatcher *Dispatcher
|
||||
peers *peerList
|
||||
|
||||
// This will only be set when a state sync is in progress. It is used to feed
|
||||
// received snapshots and chunks into the sync.
|
||||
mtx tmsync.RWMutex
|
||||
syncer *syncer
|
||||
// These will only be set when a state sync is in progress. It is used to feed
|
||||
// received snapshots and chunks into the syncer and manage incoming and outgoing
|
||||
// providers.
|
||||
mtx tmsync.RWMutex
|
||||
syncer *syncer
|
||||
providers map[types.NodeID]*BlockProvider
|
||||
stateProvider StateProvider
|
||||
}
|
||||
|
||||
// NewReactor returns a reference to a new state sync reactor, which implements
|
||||
@@ -139,29 +165,36 @@ type Reactor struct {
|
||||
// and querying, references to p2p Channels and a channel to listen for peer
|
||||
// updates on. Note, the reactor will close all p2p Channels when stopping.
|
||||
func NewReactor(
|
||||
chainID string,
|
||||
initialHeight int64,
|
||||
cfg config.StateSyncConfig,
|
||||
logger log.Logger,
|
||||
conn proxy.AppConnSnapshot,
|
||||
connQuery proxy.AppConnQuery,
|
||||
snapshotCh, chunkCh, blockCh *p2p.Channel,
|
||||
snapshotCh, chunkCh, blockCh, paramsCh *p2p.Channel,
|
||||
peerUpdates *p2p.PeerUpdates,
|
||||
stateStore sm.Store,
|
||||
blockStore *store.BlockStore,
|
||||
tempDir string,
|
||||
) *Reactor {
|
||||
r := &Reactor{
|
||||
cfg: cfg,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshotCh: snapshotCh,
|
||||
chunkCh: chunkCh,
|
||||
blockCh: blockCh,
|
||||
peerUpdates: peerUpdates,
|
||||
closeCh: make(chan struct{}),
|
||||
tempDir: tempDir,
|
||||
dispatcher: newDispatcher(blockCh.Out, lightBlockResponseTimeout),
|
||||
stateStore: stateStore,
|
||||
blockStore: blockStore,
|
||||
chainID: chainID,
|
||||
initialHeight: initialHeight,
|
||||
cfg: cfg,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshotCh: snapshotCh,
|
||||
chunkCh: chunkCh,
|
||||
blockCh: blockCh,
|
||||
paramsCh: paramsCh,
|
||||
peerUpdates: peerUpdates,
|
||||
closeCh: make(chan struct{}),
|
||||
tempDir: tempDir,
|
||||
stateStore: stateStore,
|
||||
blockStore: blockStore,
|
||||
peers: newPeerList(),
|
||||
dispatcher: NewDispatcher(blockCh.Out, lightBlockResponseTimeout),
|
||||
providers: make(map[types.NodeID]*BlockProvider),
|
||||
}
|
||||
|
||||
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
|
||||
@@ -187,9 +220,9 @@ func (r *Reactor) OnStart() error {
|
||||
|
||||
go r.processBlockCh()
|
||||
|
||||
go r.processPeerUpdates()
|
||||
go r.processParamsCh()
|
||||
|
||||
r.dispatcher.start()
|
||||
go r.processPeerUpdates()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -198,7 +231,7 @@ func (r *Reactor) OnStart() error {
|
||||
// blocking until they all exit.
|
||||
func (r *Reactor) OnStop() {
|
||||
// tell the dispatcher to stop sending any more requests
|
||||
r.dispatcher.stop()
|
||||
r.dispatcher.Stop()
|
||||
|
||||
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
|
||||
// p2p Channels should execute Close().
|
||||
@@ -210,27 +243,25 @@ func (r *Reactor) OnStop() {
|
||||
<-r.snapshotCh.Done()
|
||||
<-r.chunkCh.Done()
|
||||
<-r.blockCh.Done()
|
||||
<-r.paramsCh.Done()
|
||||
<-r.peerUpdates.Done()
|
||||
}
|
||||
|
||||
// Sync runs a state sync, fetching snapshots and providing chunks to the
|
||||
// application. It also saves tendermint state and runs a backfill process to
|
||||
// retrieve the necessary amount of headers, commits and validators sets to be
|
||||
// able to process evidence and participate in consensus.
|
||||
func (r *Reactor) Sync(
|
||||
ctx context.Context,
|
||||
stateProvider StateProvider,
|
||||
discoveryTime time.Duration,
|
||||
) (sm.State, error) {
|
||||
// application. At the close of the operation, Sync will bootstrap the state
|
||||
// store and persist the commit at that height so that either consensus or
|
||||
// blocksync can commence. It will then proceed to backfill the necessary amount
|
||||
// of historical blocks before participating in consensus
|
||||
func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
|
||||
r.waitForEnoughPeers(ctx, 2)
|
||||
r.mtx.Lock()
|
||||
if r.syncer != nil {
|
||||
r.mtx.Unlock()
|
||||
return sm.State{}, errors.New("a state sync is already in progress")
|
||||
}
|
||||
|
||||
if stateProvider == nil {
|
||||
r.mtx.Unlock()
|
||||
return sm.State{}, errors.New("the stateProvider should not be nil when doing the state sync")
|
||||
if err := r.initStateProvider(ctx, r.chainID, r.initialHeight); err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
|
||||
r.syncer = newSyncer(
|
||||
@@ -238,12 +269,19 @@ func (r *Reactor) Sync(
|
||||
r.Logger,
|
||||
r.conn,
|
||||
r.connQuery,
|
||||
stateProvider,
|
||||
r.stateProvider,
|
||||
r.snapshotCh.Out,
|
||||
r.chunkCh.Out,
|
||||
r.tempDir,
|
||||
)
|
||||
r.mtx.Unlock()
|
||||
defer func() {
|
||||
r.mtx.Lock()
|
||||
// reset syncing objects at the close of Sync
|
||||
r.syncer = nil
|
||||
r.stateProvider = nil
|
||||
r.mtx.Unlock()
|
||||
}()
|
||||
|
||||
requestSnapshotsHook := func() {
|
||||
// request snapshots from all currently connected peers
|
||||
@@ -253,15 +291,11 @@ func (r *Reactor) Sync(
|
||||
}
|
||||
}
|
||||
|
||||
state, commit, err := r.syncer.SyncAny(ctx, discoveryTime, requestSnapshotsHook)
|
||||
state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook)
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
|
||||
r.mtx.Lock()
|
||||
r.syncer = nil
|
||||
r.mtx.Unlock()
|
||||
|
||||
err = r.stateStore.Bootstrap(state)
|
||||
if err != nil {
|
||||
return sm.State{}, fmt.Errorf("failed to bootstrap node with new state: %w", err)
|
||||
@@ -272,6 +306,11 @@ func (r *Reactor) Sync(
|
||||
return sm.State{}, fmt.Errorf("failed to store last seen commit: %w", err)
|
||||
}
|
||||
|
||||
err = r.Backfill(ctx, state)
|
||||
if err != nil {
|
||||
r.Logger.Error("backfill failed. Proceeding optimistically...", "err", err)
|
||||
}
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
@@ -279,7 +318,7 @@ func (r *Reactor) Sync(
|
||||
// order. It does not stop verifying blocks until reaching a block with a height
|
||||
// and time that is less or equal to the stopHeight and stopTime. The
|
||||
// trustedBlockID should be of the header at startHeight.
|
||||
func (r *Reactor) Backfill(state sm.State) error {
|
||||
func (r *Reactor) Backfill(ctx context.Context, state sm.State) error {
|
||||
params := state.ConsensusParams.Evidence
|
||||
stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks
|
||||
stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration)
|
||||
@@ -290,7 +329,7 @@ func (r *Reactor) Backfill(state sm.State) error {
|
||||
stopTime = state.LastBlockTime
|
||||
}
|
||||
return r.backfill(
|
||||
context.Background(),
|
||||
ctx,
|
||||
state.ChainID,
|
||||
state.LastBlockHeight,
|
||||
stopHeight,
|
||||
@@ -308,12 +347,12 @@ func (r *Reactor) backfill(
|
||||
stopTime time.Time,
|
||||
) error {
|
||||
r.Logger.Info("starting backfill process...", "startHeight", startHeight,
|
||||
"stopHeight", stopHeight, "trustedBlockID", trustedBlockID)
|
||||
"stopHeight", stopHeight, "stopTime", stopTime, "trustedBlockID", trustedBlockID)
|
||||
|
||||
const sleepTime = 1 * time.Second
|
||||
var (
|
||||
lastValidatorSet *types.ValidatorSet
|
||||
lastChangeHeight int64 = startHeight
|
||||
lastChangeHeight = startHeight
|
||||
)
|
||||
|
||||
queue := newBlockQueue(startHeight, stopHeight, initialHeight, stopTime, maxLightBlockRequestRetries)
|
||||
@@ -330,8 +369,10 @@ func (r *Reactor) backfill(
|
||||
for {
|
||||
select {
|
||||
case height := <-queue.nextHeight():
|
||||
r.Logger.Debug("fetching next block", "height", height)
|
||||
lb, peer, err := r.dispatcher.LightBlock(ctxWithCancel, height)
|
||||
peer := r.peers.Pop(ctx)
|
||||
r.Logger.Debug("fetching next block", "height", height, "peer", peer)
|
||||
lb, err := r.dispatcher.LightBlock(ctxWithCancel, height, peer)
|
||||
r.peers.Append(peer)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
@@ -353,7 +394,7 @@ func (r *Reactor) backfill(
|
||||
queue.retry(height)
|
||||
// As we are fetching blocks backwards, if this node doesn't have the block it likely doesn't
|
||||
// have any prior ones, thus we remove it from the peer list.
|
||||
r.dispatcher.removePeer(peer)
|
||||
r.peers.Remove(peer)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -450,12 +491,6 @@ func (r *Reactor) backfill(
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatcher exposes the dispatcher so that a state provider can use it for
|
||||
// light client verification
|
||||
func (r *Reactor) Dispatcher() *dispatcher { //nolint:golint
|
||||
return r.dispatcher
|
||||
}
|
||||
|
||||
// handleSnapshotMessage handles envelopes sent from peers on the
|
||||
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
|
||||
// for this channel. This should never be called outside of handleMessage.
|
||||
@@ -498,7 +533,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Debug("received snapshot", "height", msg.Height, "format", msg.Format)
|
||||
logger.Info("received snapshot", "height", msg.Height, "format", msg.Format)
|
||||
_, err := r.syncer.AddSnapshot(envelope.From, &snapshot{
|
||||
Height: msg.Height,
|
||||
Format: msg.Format,
|
||||
@@ -516,6 +551,7 @@ func (r *Reactor) handleSnapshotMessage(envelope p2p.Envelope) error {
|
||||
)
|
||||
return nil
|
||||
}
|
||||
logger.Info("added snapshot", "height", msg.Height, "format", msg.Format)
|
||||
|
||||
default:
|
||||
return fmt.Errorf("received unknown message: %T", msg)
|
||||
@@ -623,6 +659,15 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
|
||||
r.Logger.Error("failed to retrieve light block", "err", err, "height", msg.Height)
|
||||
return err
|
||||
}
|
||||
if lb == nil {
|
||||
r.blockCh.Out <- p2p.Envelope{
|
||||
To: envelope.From,
|
||||
Message: &ssproto.LightBlockResponse{
|
||||
LightBlock: nil,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
lbproto, err := lb.ToProto()
|
||||
if err != nil {
|
||||
@@ -640,8 +685,56 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
|
||||
}
|
||||
|
||||
case *ssproto.LightBlockResponse:
|
||||
if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil {
|
||||
r.Logger.Error("error processing light block response", "err", err)
|
||||
var height int64 = 0
|
||||
if msg.LightBlock != nil {
|
||||
height = msg.LightBlock.SignedHeader.Header.Height
|
||||
}
|
||||
r.Logger.Info("received light block response", "peer", envelope.From, "height", height)
|
||||
if err := r.dispatcher.Respond(msg.LightBlock, envelope.From); err != nil {
|
||||
r.Logger.Error("error processing light block response", "err", err, "height", height)
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("received unknown message: %T", msg)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.ParamsRequest:
|
||||
r.Logger.Debug("received consensus params request", "height", msg.Height)
|
||||
cp, err := r.stateStore.LoadConsensusParams(int64(msg.Height))
|
||||
if err != nil {
|
||||
r.Logger.Error("failed to fetch requested consensus params", "err", err, "height", msg.Height)
|
||||
return nil
|
||||
}
|
||||
|
||||
cpproto := cp.ToProto()
|
||||
r.paramsCh.Out <- p2p.Envelope{
|
||||
To: envelope.From,
|
||||
Message: &ssproto.ParamsResponse{
|
||||
Height: msg.Height,
|
||||
ConsensusParams: cpproto,
|
||||
},
|
||||
}
|
||||
|
||||
case *ssproto.ParamsResponse:
|
||||
r.mtx.RLock()
|
||||
defer r.mtx.RUnlock()
|
||||
r.Logger.Debug("received consensus params response", "height", msg.Height)
|
||||
|
||||
cp := types.ConsensusParamsFromProto(msg.ConsensusParams)
|
||||
|
||||
if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
|
||||
r.Logger.Debug("passing along message")
|
||||
select {
|
||||
case sp.paramsRecvCh <- cp:
|
||||
default:
|
||||
}
|
||||
} else {
|
||||
r.Logger.Debug("received unexpected params response; using RPC state provider", "peer", envelope.From)
|
||||
}
|
||||
|
||||
default:
|
||||
@@ -678,6 +771,9 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
|
||||
case LightBlockChannel:
|
||||
err = r.handleLightBlockMessage(envelope)
|
||||
|
||||
case ParamsChannel:
|
||||
err = r.handleParamsMessage(envelope)
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
|
||||
}
|
||||
@@ -703,6 +799,10 @@ func (r *Reactor) processBlockCh() {
|
||||
r.processCh(r.blockCh, "light block")
|
||||
}
|
||||
|
||||
func (r *Reactor) processParamsCh() {
|
||||
r.processCh(r.paramsCh, "consensus params")
|
||||
}
|
||||
|
||||
// processCh routes state sync messages to their respective handlers. Any error
|
||||
// encountered during message execution will result in a PeerError being sent on
|
||||
// the respective channel. When the reactor is stopped, we will catch the signal
|
||||
@@ -732,24 +832,38 @@ func (r *Reactor) processCh(ch *p2p.Channel, chName string) {
|
||||
// processPeerUpdate processes a PeerUpdate, returning an error upon failing to
|
||||
// handle the PeerUpdate or if a panic is recovered.
|
||||
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
|
||||
r.Logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
r.mtx.RLock()
|
||||
defer r.mtx.RUnlock()
|
||||
r.Logger.Info("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
switch peerUpdate.Status {
|
||||
case p2p.PeerStatusUp:
|
||||
if r.syncer != nil {
|
||||
r.syncer.AddPeer(peerUpdate.NodeID)
|
||||
r.peers.Append(peerUpdate.NodeID)
|
||||
case p2p.PeerStatusDown:
|
||||
r.peers.Remove(peerUpdate.NodeID)
|
||||
}
|
||||
|
||||
r.mtx.Lock()
|
||||
if r.syncer == nil {
|
||||
r.mtx.Unlock()
|
||||
return
|
||||
}
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
switch peerUpdate.Status {
|
||||
case p2p.PeerStatusUp:
|
||||
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
|
||||
r.providers[peerUpdate.NodeID] = newProvider
|
||||
r.syncer.AddPeer(peerUpdate.NodeID)
|
||||
if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
|
||||
// we do this in a separate routine to not block whilst waiting for the light client to finish
|
||||
// whatever call it's currently executing
|
||||
go sp.addProvider(newProvider)
|
||||
}
|
||||
r.dispatcher.addPeer(peerUpdate.NodeID)
|
||||
|
||||
case p2p.PeerStatusDown:
|
||||
if r.syncer != nil {
|
||||
r.syncer.RemovePeer(peerUpdate.NodeID)
|
||||
}
|
||||
r.dispatcher.removePeer(peerUpdate.NodeID)
|
||||
delete(r.providers, peerUpdate.NodeID)
|
||||
r.syncer.RemovePeer(peerUpdate.NodeID)
|
||||
}
|
||||
r.Logger.Info("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
}
|
||||
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
@@ -839,5 +953,46 @@ func (r *Reactor) fetchLightBlock(height uint64) (*types.LightBlock, error) {
|
||||
},
|
||||
ValidatorSet: vals,
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
if r.peers.Len() >= numPeers {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error {
|
||||
var err error
|
||||
to := light.TrustOptions{
|
||||
Period: r.cfg.TrustPeriod,
|
||||
Height: r.cfg.TrustHeight,
|
||||
Hash: r.cfg.TrustHashBytes(),
|
||||
}
|
||||
spLogger := r.Logger.With("module", "stateprovider")
|
||||
|
||||
if r.cfg.UseP2P {
|
||||
peers := r.peers.All()
|
||||
providers := make([]provider.Provider, len(peers))
|
||||
for idx, p := range peers {
|
||||
providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
|
||||
}
|
||||
|
||||
r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh.Out, spLogger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package statesync
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -21,6 +22,7 @@ import (
|
||||
"github.com/tendermint/tendermint/light/provider"
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
proxymocks "github.com/tendermint/tendermint/proxy/mocks"
|
||||
smmocks "github.com/tendermint/tendermint/state/mocks"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
@@ -50,6 +52,11 @@ type reactorTestSuite struct {
|
||||
blockOutCh chan p2p.Envelope
|
||||
blockPeerErrCh chan p2p.PeerError
|
||||
|
||||
paramsChannel *p2p.Channel
|
||||
paramsInCh chan p2p.Envelope
|
||||
paramsOutCh chan p2p.Envelope
|
||||
paramsPeerErrCh chan p2p.PeerError
|
||||
|
||||
peerUpdateCh chan p2p.PeerUpdate
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
|
||||
@@ -86,6 +93,9 @@ func setup(
|
||||
blockInCh: make(chan p2p.Envelope, chBuf),
|
||||
blockOutCh: make(chan p2p.Envelope, chBuf),
|
||||
blockPeerErrCh: make(chan p2p.PeerError, chBuf),
|
||||
paramsInCh: make(chan p2p.Envelope, chBuf),
|
||||
paramsOutCh: make(chan p2p.Envelope, chBuf),
|
||||
paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
stateProvider: stateProvider,
|
||||
@@ -118,12 +128,22 @@ func setup(
|
||||
rts.blockPeerErrCh,
|
||||
)
|
||||
|
||||
rts.paramsChannel = p2p.NewChannel(
|
||||
ParamsChannel,
|
||||
new(ssproto.Message),
|
||||
rts.paramsInCh,
|
||||
rts.paramsOutCh,
|
||||
rts.paramsPeerErrCh,
|
||||
)
|
||||
|
||||
rts.stateStore = &smmocks.Store{}
|
||||
rts.blockStore = store.NewBlockStore(dbm.NewMemDB())
|
||||
|
||||
cfg := config.DefaultStateSyncConfig()
|
||||
|
||||
rts.reactor = NewReactor(
|
||||
factory.DefaultTestChainID,
|
||||
1,
|
||||
*cfg,
|
||||
log.TestingLogger(),
|
||||
conn,
|
||||
@@ -131,6 +151,7 @@ func setup(
|
||||
rts.snapshotChannel,
|
||||
rts.chunkChannel,
|
||||
rts.blockChannel,
|
||||
rts.paramsChannel,
|
||||
rts.peerUpdates,
|
||||
rts.stateStore,
|
||||
rts.blockStore,
|
||||
@@ -138,7 +159,8 @@ func setup(
|
||||
)
|
||||
|
||||
// override the dispatcher with one with a shorter timeout
|
||||
rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second)
|
||||
rts.reactor.dispatcher = NewDispatcher(rts.blockChannel.Out,
|
||||
100*time.Millisecond)
|
||||
|
||||
rts.syncer = newSyncer(
|
||||
*cfg,
|
||||
@@ -162,6 +184,58 @@ func setup(
|
||||
return rts
|
||||
}
|
||||
|
||||
func TestReactor_Sync(t *testing.T) {
|
||||
var snapshotHeight int64 = 7
|
||||
rts := setup(t, nil, nil, nil, 2)
|
||||
chain := buildLightBlockChain(t, 1, 10, time.Now())
|
||||
// app accepts any snapshot
|
||||
rts.conn.On("OfferSnapshotSync", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
|
||||
Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ACCEPT}, nil)
|
||||
|
||||
// app accepts every chunk
|
||||
rts.conn.On("ApplySnapshotChunkSync", ctx, mock.AnythingOfType("types.RequestApplySnapshotChunk")).
|
||||
Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
|
||||
|
||||
// app query returns valid state app hash
|
||||
rts.connQuery.On("InfoSync", ctx, proxy.RequestInfo).Return(&abci.ResponseInfo{
|
||||
AppVersion: 9,
|
||||
LastBlockHeight: snapshotHeight,
|
||||
LastBlockAppHash: chain[snapshotHeight+1].AppHash,
|
||||
}, nil)
|
||||
|
||||
// store accepts state and validator sets
|
||||
rts.stateStore.On("Bootstrap", mock.AnythingOfType("state.State")).Return(nil)
|
||||
rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"),
|
||||
mock.AnythingOfType("*types.ValidatorSet")).Return(nil)
|
||||
|
||||
closeCh := make(chan struct{})
|
||||
defer close(closeCh)
|
||||
go handleLightBlockRequests(t, chain, rts.blockOutCh,
|
||||
rts.blockInCh, closeCh, 0)
|
||||
go graduallyAddPeers(rts.peerUpdateCh, closeCh, 1*time.Second)
|
||||
go handleSnapshotRequests(t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
|
||||
{
|
||||
Height: uint64(snapshotHeight),
|
||||
Format: 1,
|
||||
Chunks: 1,
|
||||
},
|
||||
})
|
||||
|
||||
go handleChunkRequests(t, rts.chunkOutCh, rts.chunkInCh, closeCh, []byte("abc"))
|
||||
|
||||
go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh)
|
||||
|
||||
// update the config to use the p2p provider
|
||||
rts.reactor.cfg.UseP2P = true
|
||||
rts.reactor.cfg.TrustHeight = 1
|
||||
rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
|
||||
rts.reactor.cfg.DiscoveryTime = 1 * time.Second
|
||||
|
||||
// Run state sync
|
||||
_, err := rts.reactor.Sync(context.Background())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
|
||||
rts := setup(t, nil, nil, nil, 2)
|
||||
|
||||
@@ -370,7 +444,7 @@ func TestReactor_LightBlockResponse(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReactor_Dispatcher(t *testing.T) {
|
||||
func TestReactor_BlockProviders(t *testing.T) {
|
||||
rts := setup(t, nil, nil, nil, 2)
|
||||
rts.peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: types.NodeID("aa"),
|
||||
@@ -387,9 +461,13 @@ func TestReactor_Dispatcher(t *testing.T) {
|
||||
chain := buildLightBlockChain(t, 1, 10, time.Now())
|
||||
go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
|
||||
|
||||
dispatcher := rts.reactor.Dispatcher()
|
||||
providers := dispatcher.Providers(factory.DefaultTestChainID, 5*time.Second)
|
||||
require.Len(t, providers, 2)
|
||||
peers := rts.reactor.peers.All()
|
||||
require.Len(t, peers, 2)
|
||||
|
||||
providers := make([]provider.Provider, len(peers))
|
||||
for idx, peer := range peers {
|
||||
providers[idx] = NewBlockProvider(peer, factory.DefaultTestChainID, rts.reactor.dispatcher)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
@@ -416,6 +494,59 @@ func TestReactor_Dispatcher(t *testing.T) {
|
||||
t.Fail()
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestReactor_StateProviderP2P(t *testing.T) {
|
||||
rts := setup(t, nil, nil, nil, 2)
|
||||
// make syncer non nil else test won't think we are state syncing
|
||||
rts.reactor.syncer = rts.syncer
|
||||
peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
|
||||
peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength))
|
||||
rts.peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: peerA,
|
||||
Status: p2p.PeerStatusUp,
|
||||
}
|
||||
rts.peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: peerB,
|
||||
Status: p2p.PeerStatusUp,
|
||||
}
|
||||
|
||||
closeCh := make(chan struct{})
|
||||
defer close(closeCh)
|
||||
|
||||
chain := buildLightBlockChain(t, 1, 10, time.Now())
|
||||
go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
|
||||
go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh)
|
||||
|
||||
rts.reactor.cfg.UseP2P = true
|
||||
rts.reactor.cfg.TrustHeight = 1
|
||||
rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())
|
||||
ctx := context.Background()
|
||||
rts.reactor.mtx.Lock()
|
||||
err := rts.reactor.initStateProvider(ctx, factory.DefaultTestChainID, 1)
|
||||
rts.reactor.mtx.Unlock()
|
||||
require.NoError(t, err)
|
||||
rts.reactor.syncer.stateProvider = rts.reactor.stateProvider
|
||||
|
||||
appHash, err := rts.reactor.stateProvider.AppHash(ctx, 5)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, appHash, 32)
|
||||
|
||||
state, err := rts.reactor.stateProvider.State(ctx, 5)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, appHash, state.AppHash)
|
||||
require.Equal(t, types.DefaultConsensusParams(), &state.ConsensusParams)
|
||||
|
||||
commit, err := rts.reactor.stateProvider.Commit(ctx, 5)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, commit.BlockID, state.LastBlockID)
|
||||
|
||||
added, err := rts.reactor.syncer.AddSnapshot(peerA, &snapshot{
|
||||
Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.True(t, added)
|
||||
}
|
||||
|
||||
func TestReactor_Backfill(t *testing.T) {
|
||||
@@ -494,7 +625,6 @@ func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) {
|
||||
if fn() {
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, ctx.Err())
|
||||
}
|
||||
}
|
||||
@@ -523,7 +653,9 @@ func handleLightBlockRequests(t *testing.T,
|
||||
} else {
|
||||
switch errorCount % 3 {
|
||||
case 0: // send a different block
|
||||
differntLB, err := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID()).ToProto()
|
||||
vals, pv := factory.RandValidatorSet(3, 10)
|
||||
_, _, lb := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv)
|
||||
differntLB, err := lb.ToProto()
|
||||
require.NoError(t, err)
|
||||
sending <- p2p.Envelope{
|
||||
From: envelope.To,
|
||||
@@ -550,37 +682,142 @@ func handleLightBlockRequests(t *testing.T,
|
||||
}
|
||||
}
|
||||
|
||||
func handleConsensusParamsRequest(t *testing.T, receiving, sending chan p2p.Envelope, closeCh chan struct{}) {
|
||||
params := types.DefaultConsensusParams()
|
||||
paramsProto := params.ToProto()
|
||||
for {
|
||||
select {
|
||||
case envelope := <-receiving:
|
||||
t.Log("received consensus params request")
|
||||
msg, ok := envelope.Message.(*ssproto.ParamsRequest)
|
||||
require.True(t, ok)
|
||||
sending <- p2p.Envelope{
|
||||
From: envelope.To,
|
||||
Message: &ssproto.ParamsResponse{
|
||||
Height: msg.Height,
|
||||
ConsensusParams: paramsProto,
|
||||
},
|
||||
}
|
||||
|
||||
case <-closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
|
||||
chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
|
||||
lastBlockID := factory.MakeBlockID()
|
||||
blockTime := startTime.Add(-5 * time.Minute)
|
||||
blockTime := startTime.Add(time.Duration(fromHeight-toHeight) * time.Minute)
|
||||
vals, pv := factory.RandValidatorSet(3, 10)
|
||||
for height := fromHeight; height < toHeight; height++ {
|
||||
chain[height] = mockLB(t, height, blockTime, lastBlockID)
|
||||
vals, pv, chain[height] = mockLB(t, height, blockTime, lastBlockID, vals, pv)
|
||||
lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
|
||||
blockTime = blockTime.Add(1 * time.Minute)
|
||||
}
|
||||
return chain
|
||||
}
|
||||
|
||||
func mockLB(t *testing.T, height int64, time time.Time,
|
||||
lastBlockID types.BlockID) *types.LightBlock {
|
||||
func mockLB(t *testing.T, height int64, time time.Time, lastBlockID types.BlockID,
|
||||
currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator,
|
||||
) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) {
|
||||
header, err := factory.MakeHeader(&types.Header{
|
||||
Height: height,
|
||||
LastBlockID: lastBlockID,
|
||||
Time: time,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
vals, pv := factory.RandValidatorSet(3, 10)
|
||||
header.ValidatorsHash = vals.Hash()
|
||||
nextVals, nextPrivVals := factory.RandValidatorSet(3, 10)
|
||||
header.ValidatorsHash = currentVals.Hash()
|
||||
header.NextValidatorsHash = nextVals.Hash()
|
||||
header.ConsensusHash = types.DefaultConsensusParams().HashConsensusParams()
|
||||
lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
|
||||
voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, vals)
|
||||
commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, pv, time)
|
||||
voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, currentVals)
|
||||
commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, currentPrivVals, time)
|
||||
require.NoError(t, err)
|
||||
return &types.LightBlock{
|
||||
return nextVals, nextPrivVals, &types.LightBlock{
|
||||
SignedHeader: &types.SignedHeader{
|
||||
Header: header,
|
||||
Commit: commit,
|
||||
},
|
||||
ValidatorSet: vals,
|
||||
ValidatorSet: currentVals,
|
||||
}
|
||||
}
|
||||
|
||||
func graduallyAddPeers(
|
||||
peerUpdateCh chan p2p.PeerUpdate,
|
||||
closeCh chan struct{},
|
||||
interval time.Duration,
|
||||
) {
|
||||
ticker := time.NewTicker(interval)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: factory.RandomNodeID(),
|
||||
Status: p2p.PeerStatusUp,
|
||||
}
|
||||
case <-closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleSnapshotRequests(
|
||||
t *testing.T,
|
||||
receivingCh chan p2p.Envelope,
|
||||
sendingCh chan p2p.Envelope,
|
||||
closeCh chan struct{},
|
||||
snapshots []snapshot,
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case envelope := <-receivingCh:
|
||||
_, ok := envelope.Message.(*ssproto.SnapshotsRequest)
|
||||
require.True(t, ok)
|
||||
for _, snapshot := range snapshots {
|
||||
sendingCh <- p2p.Envelope{
|
||||
From: envelope.To,
|
||||
Message: &ssproto.SnapshotsResponse{
|
||||
Height: snapshot.Height,
|
||||
Format: snapshot.Format,
|
||||
Chunks: snapshot.Chunks,
|
||||
Hash: snapshot.Hash,
|
||||
Metadata: snapshot.Metadata,
|
||||
},
|
||||
}
|
||||
}
|
||||
case <-closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleChunkRequests(
|
||||
t *testing.T,
|
||||
receivingCh chan p2p.Envelope,
|
||||
sendingCh chan p2p.Envelope,
|
||||
closeCh chan struct{},
|
||||
chunk []byte,
|
||||
) {
|
||||
for {
|
||||
select {
|
||||
case envelope := <-receivingCh:
|
||||
msg, ok := envelope.Message.(*ssproto.ChunkRequest)
|
||||
require.True(t, ok)
|
||||
sendingCh <- p2p.Envelope{
|
||||
From: envelope.To,
|
||||
Message: &ssproto.ChunkResponse{
|
||||
Height: msg.Height,
|
||||
Format: msg.Format,
|
||||
Index: msg.Index,
|
||||
Chunk: chunk,
|
||||
Missing: false,
|
||||
},
|
||||
}
|
||||
|
||||
case <-closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -43,8 +41,6 @@ func (s *snapshot) Key() snapshotKey {
|
||||
|
||||
// snapshotPool discovers and aggregates snapshots across peers.
|
||||
type snapshotPool struct {
|
||||
stateProvider StateProvider
|
||||
|
||||
tmsync.Mutex
|
||||
snapshots map[snapshotKey]*snapshot
|
||||
snapshotPeers map[snapshotKey]map[types.NodeID]types.NodeID
|
||||
@@ -61,9 +57,8 @@ type snapshotPool struct {
|
||||
}
|
||||
|
||||
// newSnapshotPool creates a new snapshot pool. The state source is used for
|
||||
func newSnapshotPool(stateProvider StateProvider) *snapshotPool {
|
||||
func newSnapshotPool() *snapshotPool {
|
||||
return &snapshotPool{
|
||||
stateProvider: stateProvider,
|
||||
snapshots: make(map[snapshotKey]*snapshot),
|
||||
snapshotPeers: make(map[snapshotKey]map[types.NodeID]types.NodeID),
|
||||
formatIndex: make(map[uint32]map[snapshotKey]bool),
|
||||
@@ -80,14 +75,6 @@ func newSnapshotPool(stateProvider StateProvider) *snapshotPool {
|
||||
// snapshot height is verified using the light client, and the expected app hash
|
||||
// is set for the snapshot.
|
||||
func (p *snapshotPool) Add(peerID types.NodeID, snapshot *snapshot) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
appHash, err := p.stateProvider.AppHash(ctx, snapshot.Height)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get app hash: %w", err)
|
||||
}
|
||||
snapshot.trustedAppHash = appHash
|
||||
key := snapshot.Key()
|
||||
|
||||
p.Lock()
|
||||
|
||||
@@ -3,10 +3,8 @@ package statesync
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/statesync/mocks"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -39,13 +37,10 @@ func TestSnapshot_Key(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Add(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, uint64(1)).Return([]byte("app_hash"), nil)
|
||||
|
||||
peerID := types.NodeID("aa")
|
||||
|
||||
// Adding to the pool should work
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
added, err := pool.Add(peerID, &snapshot{
|
||||
Height: 1,
|
||||
Format: 1,
|
||||
@@ -66,18 +61,12 @@ func TestSnapshotPool_Add(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.False(t, added)
|
||||
|
||||
// The pool should have populated the snapshot with the trusted app hash
|
||||
snapshot := pool.Best()
|
||||
require.NotNil(t, snapshot)
|
||||
require.Equal(t, []byte("app_hash"), snapshot.trustedAppHash)
|
||||
|
||||
stateProvider.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestSnapshotPool_GetPeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}}
|
||||
|
||||
@@ -112,9 +101,7 @@ func TestSnapshotPool_GetPeer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_GetPeers(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 1, Hash: []byte{1}}
|
||||
|
||||
@@ -137,9 +124,7 @@ func TestSnapshotPool_GetPeers(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Ranked_Best(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
// snapshots in expected order (best to worst). Highest height wins, then highest format.
|
||||
// Snapshots with different chunk hashes are considered different, and the most peers is
|
||||
@@ -182,9 +167,7 @@ func TestSnapshotPool_Ranked_Best(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_Reject(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerID := types.NodeID("aa")
|
||||
|
||||
@@ -212,9 +195,7 @@ func TestSnapshotPool_Reject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RejectFormat(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerID := types.NodeID("aa")
|
||||
|
||||
@@ -243,9 +224,7 @@ func TestSnapshotPool_RejectFormat(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RejectPeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerAID := types.NodeID("aa")
|
||||
peerBID := types.NodeID("bb")
|
||||
@@ -285,9 +264,7 @@ func TestSnapshotPool_RejectPeer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSnapshotPool_RemovePeer(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
pool := newSnapshotPool(stateProvider)
|
||||
pool := newSnapshotPool()
|
||||
|
||||
peerAID := types.NodeID("aa")
|
||||
peerBID := types.NodeID("bb")
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package statesync
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -9,21 +11,25 @@ import (
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
lightprovider "github.com/tendermint/tendermint/light/provider"
|
||||
lighthttp "github.com/tendermint/tendermint/light/provider/http"
|
||||
lightrpc "github.com/tendermint/tendermint/light/rpc"
|
||||
lightdb "github.com/tendermint/tendermint/light/store/db"
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
)
|
||||
|
||||
//go:generate ../../scripts/mockery_generate.sh StateProvider
|
||||
|
||||
// StateProvider is a provider of trusted state data for bootstrapping a node. This refers
|
||||
// to the state.State object, not the state machine.
|
||||
// to the state.State object, not the state machine. There are two implementations. One
|
||||
// uses the P2P layer and the other uses the RPC layer. Both use light client verification.
|
||||
type StateProvider interface {
|
||||
// AppHash returns the app hash after the given height has been committed.
|
||||
AppHash(ctx context.Context, height uint64) ([]byte, error)
|
||||
@@ -33,20 +39,17 @@ type StateProvider interface {
|
||||
State(ctx context.Context, height uint64) (sm.State, error)
|
||||
}
|
||||
|
||||
// lightClientStateProvider is a state provider using the light client.
|
||||
type lightClientStateProvider struct {
|
||||
type stateProviderRPC struct {
|
||||
tmsync.Mutex // light.Client is not concurrency-safe
|
||||
lc *light.Client
|
||||
version sm.Version
|
||||
initialHeight int64
|
||||
providers map[lightprovider.Provider]string
|
||||
}
|
||||
|
||||
// NewLightClientStateProvider creates a new StateProvider using a light client and RPC clients.
|
||||
func NewLightClientStateProvider(
|
||||
// NewRPCStateProvider creates a new StateProvider using a light client and RPC clients.
|
||||
func NewRPCStateProvider(
|
||||
ctx context.Context,
|
||||
chainID string,
|
||||
version sm.Version,
|
||||
initialHeight int64,
|
||||
servers []string,
|
||||
trustOptions light.TrustOptions,
|
||||
@@ -75,51 +78,16 @@ func NewLightClientStateProvider(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &lightClientStateProvider{
|
||||
return &stateProviderRPC{
|
||||
lc: lc,
|
||||
version: version,
|
||||
initialHeight: initialHeight,
|
||||
providers: providerRemotes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewLightClientStateProviderFromDispatcher creates a light client state
|
||||
// provider but uses a p2p connected dispatched instead of RPC endpoints
|
||||
func NewLightClientStateProviderFromDispatcher(
|
||||
ctx context.Context,
|
||||
chainID string,
|
||||
version sm.Version,
|
||||
initialHeight int64,
|
||||
dispatcher *dispatcher,
|
||||
trustOptions light.TrustOptions,
|
||||
logger log.Logger,
|
||||
) (StateProvider, error) {
|
||||
providers := dispatcher.Providers(chainID, 30*time.Second)
|
||||
if len(providers) < 2 {
|
||||
return nil, fmt.Errorf("at least 2 peers are required, got %d", len(providers))
|
||||
}
|
||||
|
||||
providersMap := make(map[lightprovider.Provider]string)
|
||||
for _, p := range providers {
|
||||
providersMap[p] = p.(*blockProvider).String()
|
||||
}
|
||||
|
||||
lc, err := light.NewClient(ctx, chainID, trustOptions, providers[0], providers[1:],
|
||||
lightdb.New(dbm.NewMemDB()), light.Logger(logger))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &lightClientStateProvider{
|
||||
lc: lc,
|
||||
version: version,
|
||||
initialHeight: initialHeight,
|
||||
providers: providersMap,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AppHash implements StateProvider.
|
||||
func (s *lightClientStateProvider) AppHash(ctx context.Context, height uint64) ([]byte, error) {
|
||||
func (s *stateProviderRPC) AppHash(ctx context.Context, height uint64) ([]byte, error) {
|
||||
fmt.Println("requesting app hash")
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
@@ -128,27 +96,19 @@ func (s *lightClientStateProvider) AppHash(ctx context.Context, height uint64) (
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// We also try to fetch the blocks at height H and H+2, since we need these
|
||||
|
||||
// We also try to fetch the blocks at H+2, since we need these
|
||||
// when building the state while restoring the snapshot. This avoids the race
|
||||
// condition where we try to restore a snapshot before H+2 exists.
|
||||
//
|
||||
// FIXME This is a hack, since we can't add new methods to the interface without
|
||||
// breaking it. We should instead have a Has(ctx, height) method which checks
|
||||
// that the state provider has access to the necessary data for the height.
|
||||
// We piggyback on AppHash() since it's called when adding snapshots to the pool.
|
||||
_, err = s.lc.VerifyLightBlockAtHeight(ctx, int64(height+2), time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = s.lc.VerifyLightBlockAtHeight(ctx, int64(height), time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return header.AppHash, nil
|
||||
}
|
||||
|
||||
// Commit implements StateProvider.
|
||||
func (s *lightClientStateProvider) Commit(ctx context.Context, height uint64) (*types.Commit, error) {
|
||||
func (s *stateProviderRPC) Commit(ctx context.Context, height uint64) (*types.Commit, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
header, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height), time.Now())
|
||||
@@ -159,13 +119,12 @@ func (s *lightClientStateProvider) Commit(ctx context.Context, height uint64) (*
|
||||
}
|
||||
|
||||
// State implements StateProvider.
|
||||
func (s *lightClientStateProvider) State(ctx context.Context, height uint64) (sm.State, error) {
|
||||
func (s *stateProviderRPC) State(ctx context.Context, height uint64) (sm.State, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
state := sm.State{
|
||||
ChainID: s.lc.ChainID(),
|
||||
Version: s.version,
|
||||
InitialHeight: s.initialHeight,
|
||||
}
|
||||
if state.InitialHeight == 0 {
|
||||
@@ -193,6 +152,10 @@ func (s *lightClientStateProvider) State(ctx context.Context, height uint64) (sm
|
||||
return sm.State{}, err
|
||||
}
|
||||
|
||||
state.Version = sm.Version{
|
||||
Consensus: currentLightBlock.Version,
|
||||
Software: version.TMVersion,
|
||||
}
|
||||
state.LastBlockHeight = lastLightBlock.Height
|
||||
state.LastBlockTime = lastLightBlock.Time
|
||||
state.LastBlockID = lastLightBlock.Commit.BlockID
|
||||
@@ -235,3 +198,186 @@ func rpcClient(server string) (*rpchttp.HTTP, error) {
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
type stateProviderP2P struct {
|
||||
tmsync.Mutex // light.Client is not concurrency-safe
|
||||
lc *light.Client
|
||||
initialHeight int64
|
||||
paramsSendCh chan<- p2p.Envelope
|
||||
paramsRecvCh chan types.ConsensusParams
|
||||
}
|
||||
|
||||
// NewP2PStateProvider creates a light client state
|
||||
// provider but uses a dispatcher connected to the P2P layer
|
||||
func NewP2PStateProvider(
|
||||
ctx context.Context,
|
||||
chainID string,
|
||||
initialHeight int64,
|
||||
providers []lightprovider.Provider,
|
||||
trustOptions light.TrustOptions,
|
||||
paramsSendCh chan<- p2p.Envelope,
|
||||
logger log.Logger,
|
||||
) (StateProvider, error) {
|
||||
if len(providers) < 2 {
|
||||
return nil, fmt.Errorf("at least 2 peers are required, got %d", len(providers))
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("providers list is %d long", len(providers[1:])))
|
||||
|
||||
lc, err := light.NewClient(ctx, chainID, trustOptions, providers[0], providers[1:],
|
||||
lightdb.New(dbm.NewMemDB()), light.Logger(logger))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &stateProviderP2P{
|
||||
lc: lc,
|
||||
initialHeight: initialHeight,
|
||||
paramsSendCh: paramsSendCh,
|
||||
paramsRecvCh: make(chan types.ConsensusParams),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AppHash implements StateProvider.
|
||||
func (s *stateProviderP2P) AppHash(ctx context.Context, height uint64) ([]byte, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
// We have to fetch the next height, which contains the app hash for the previous height.
|
||||
header, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height+1), time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We also try to fetch the blocks at H+2, since we need these
|
||||
// when building the state while restoring the snapshot. This avoids the race
|
||||
// condition where we try to restore a snapshot before H+2 exists.
|
||||
_, err = s.lc.VerifyLightBlockAtHeight(ctx, int64(height+2), time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return header.AppHash, nil
|
||||
}
|
||||
|
||||
// Commit implements StateProvider.
|
||||
func (s *stateProviderP2P) Commit(ctx context.Context, height uint64) (*types.Commit, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
header, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height), time.Now())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return header.Commit, nil
|
||||
}
|
||||
|
||||
// State implements StateProvider.
|
||||
func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
state := sm.State{
|
||||
ChainID: s.lc.ChainID(),
|
||||
InitialHeight: s.initialHeight,
|
||||
}
|
||||
if state.InitialHeight == 0 {
|
||||
state.InitialHeight = 1
|
||||
}
|
||||
|
||||
// The snapshot height maps onto the state heights as follows:
|
||||
//
|
||||
// height: last block, i.e. the snapshotted height
|
||||
// height+1: current block, i.e. the first block we'll process after the snapshot
|
||||
// height+2: next block, i.e. the second block after the snapshot
|
||||
//
|
||||
// We need to fetch the NextValidators from height+2 because if the application changed
|
||||
// the validator set at the snapshot height then this only takes effect at height+2.
|
||||
lastLightBlock, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height), time.Now())
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
currentLightBlock, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height+1), time.Now())
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
nextLightBlock, err := s.lc.VerifyLightBlockAtHeight(ctx, int64(height+2), time.Now())
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
|
||||
state.Version = sm.Version{
|
||||
Consensus: currentLightBlock.Version,
|
||||
Software: version.TMVersion,
|
||||
}
|
||||
state.LastBlockHeight = lastLightBlock.Height
|
||||
state.LastBlockTime = lastLightBlock.Time
|
||||
state.LastBlockID = lastLightBlock.Commit.BlockID
|
||||
state.AppHash = currentLightBlock.AppHash
|
||||
state.LastResultsHash = currentLightBlock.LastResultsHash
|
||||
state.LastValidators = lastLightBlock.ValidatorSet
|
||||
state.Validators = currentLightBlock.ValidatorSet
|
||||
state.NextValidators = nextLightBlock.ValidatorSet
|
||||
state.LastHeightValidatorsChanged = nextLightBlock.Height
|
||||
|
||||
// We'll also need to fetch consensus params via P2P.
|
||||
state.ConsensusParams, err = s.consensusParams(ctx, currentLightBlock.Height)
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
// validate the consensus params
|
||||
if !bytes.Equal(nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams()) {
|
||||
return sm.State{}, fmt.Errorf("consensus params hash mismatch at height %d. Expected %v, got %v",
|
||||
currentLightBlock.Height, nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams())
|
||||
}
|
||||
// set the last height changed to the current height
|
||||
state.LastHeightConsensusParamsChanged = currentLightBlock.Height
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
||||
// addProvider dynamically adds a peer as a new witness. A limit of 6 providers is kept as a
|
||||
// heuristic. Too many overburdens the network and too little compromises the second layer of security.
|
||||
func (s *stateProviderP2P) addProvider(p lightprovider.Provider) {
|
||||
if len(s.lc.Witnesses()) < 6 {
|
||||
s.lc.AddProvider(p)
|
||||
}
|
||||
}
|
||||
|
||||
// consensusParams sends out a request for consensus params blocking until one is returned.
|
||||
// If it fails to get a valid set of consensus params from any of the providers it returns an error.
|
||||
func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) {
|
||||
providers := s.lc.Witnesses()
|
||||
for _, provider := range providers {
|
||||
p, ok := provider.(*BlockProvider)
|
||||
if !ok {
|
||||
panic("expected p2p state provider to use p2p block providers")
|
||||
}
|
||||
|
||||
// extract the nodeID of the provider
|
||||
peer, err := types.NewNodeID(p.String())
|
||||
if err != nil {
|
||||
return types.ConsensusParams{}, fmt.Errorf("invalid provider node id: %w, provider: %s", err, p.String())
|
||||
}
|
||||
|
||||
select {
|
||||
case s.paramsSendCh <- p2p.Envelope{
|
||||
To: peer,
|
||||
Message: &ssproto.ParamsRequest{
|
||||
Height: uint64(height),
|
||||
},
|
||||
}:
|
||||
case <-ctx.Done():
|
||||
return types.ConsensusParams{}, ctx.Err()
|
||||
}
|
||||
|
||||
select {
|
||||
// if we get no response from this provider we move on to the next one
|
||||
case <-time.After(consensusParamsResponseTimeout):
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return types.ConsensusParams{}, ctx.Err()
|
||||
case params := <-s.paramsRecvCh:
|
||||
return params, nil
|
||||
}
|
||||
}
|
||||
return types.ConsensusParams{}, errors.New("unable to fetch consensus params from connected providers")
|
||||
}
|
||||
|
||||
@@ -40,14 +40,11 @@ var (
|
||||
errRejectSender = errors.New("snapshot sender was rejected")
|
||||
// errVerifyFailed is returned by Sync() when app hash or last height
|
||||
// verification fails.
|
||||
errVerifyFailed = errors.New("verification failed")
|
||||
errVerifyFailed = errors.New("verification with app failed")
|
||||
// errTimeout is returned by Sync() when we've waited too long to receive a chunk.
|
||||
errTimeout = errors.New("timed out waiting for chunk")
|
||||
// errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
|
||||
errNoSnapshots = errors.New("no suitable snapshots found")
|
||||
// errStateCommitTimeout is returned by Sync() when the timeout for retrieving
|
||||
// tendermint state or the commit is exceeded
|
||||
errStateCommitTimeout = errors.New("timed out trying to retrieve state and commit")
|
||||
)
|
||||
|
||||
// syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
|
||||
@@ -84,7 +81,7 @@ func newSyncer(
|
||||
stateProvider: stateProvider,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshots: newSnapshotPool(stateProvider),
|
||||
snapshots: newSnapshotPool(),
|
||||
snapshotCh: snapshotCh,
|
||||
chunkCh: chunkCh,
|
||||
tempDir: tempDir,
|
||||
@@ -153,7 +150,6 @@ func (s *syncer) SyncAny(
|
||||
discoveryTime time.Duration,
|
||||
requestSnapshots func(),
|
||||
) (sm.State, *types.Commit, error) {
|
||||
|
||||
if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime {
|
||||
discoveryTime = minimumDiscoveryTime
|
||||
}
|
||||
@@ -181,7 +177,6 @@ func (s *syncer) SyncAny(
|
||||
if discoveryTime == 0 {
|
||||
return sm.State{}, nil, errNoSnapshots
|
||||
}
|
||||
requestSnapshots()
|
||||
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
|
||||
time.Sleep(discoveryTime)
|
||||
continue
|
||||
@@ -230,10 +225,6 @@ func (s *syncer) SyncAny(
|
||||
s.logger.Info("Snapshot sender rejected", "peer", peer)
|
||||
}
|
||||
|
||||
case errors.Is(err, errStateCommitTimeout):
|
||||
s.logger.Info("Timed out retrieving state and commit, rejecting and retrying...", "height", snapshot.Height)
|
||||
s.snapshots.Reject(snapshot)
|
||||
|
||||
default:
|
||||
return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
|
||||
}
|
||||
@@ -264,8 +255,24 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
|
||||
s.mtx.Unlock()
|
||||
}()
|
||||
|
||||
hctx, hcancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer hcancel()
|
||||
|
||||
// Fetch the app hash corresponding to the snapshot
|
||||
appHash, err := s.stateProvider.AppHash(hctx, snapshot.Height)
|
||||
if err != nil {
|
||||
// check if the main context was triggered
|
||||
if ctx.Err() != nil {
|
||||
return sm.State{}, nil, ctx.Err()
|
||||
}
|
||||
s.logger.Info("failed to get and verify tendermint state. Dropping snapshot and trying again",
|
||||
"err", err, "height", snapshot.Height)
|
||||
return sm.State{}, nil, errRejectSnapshot
|
||||
}
|
||||
snapshot.trustedAppHash = appHash
|
||||
|
||||
// Offer snapshot to ABCI app.
|
||||
err := s.offerSnapshot(ctx, snapshot)
|
||||
err = s.offerSnapshot(ctx, snapshot)
|
||||
if err != nil {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
@@ -277,27 +284,29 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
|
||||
go s.fetchChunks(fetchCtx, snapshot, chunks)
|
||||
}
|
||||
|
||||
pctx, pcancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
pctx, pcancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer pcancel()
|
||||
|
||||
// Optimistically build new state, so we don't discover any light client failures at the end.
|
||||
state, err := s.stateProvider.State(pctx, snapshot.Height)
|
||||
if err != nil {
|
||||
// check if the provider context exceeded the 10 second deadline
|
||||
if err == context.DeadlineExceeded && ctx.Err() == nil {
|
||||
return sm.State{}, nil, errStateCommitTimeout
|
||||
// check if the main context was triggered
|
||||
if ctx.Err() != nil {
|
||||
return sm.State{}, nil, ctx.Err()
|
||||
}
|
||||
|
||||
return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
|
||||
s.logger.Info("failed to get and verify tendermint state. Dropping snapshot and trying again",
|
||||
"err", err, "height", snapshot.Height)
|
||||
return sm.State{}, nil, errRejectSnapshot
|
||||
}
|
||||
commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
|
||||
if err != nil {
|
||||
// check if the provider context exceeded the 10 second deadline
|
||||
if err == context.DeadlineExceeded && ctx.Err() == nil {
|
||||
return sm.State{}, nil, errStateCommitTimeout
|
||||
if ctx.Err() != nil {
|
||||
return sm.State{}, nil, ctx.Err()
|
||||
}
|
||||
|
||||
return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
|
||||
s.logger.Info("failed to get and verify commit. Dropping snapshot and trying again",
|
||||
"err", err, "height", snapshot.Height)
|
||||
return sm.State{}, nil, errRejectSnapshot
|
||||
}
|
||||
|
||||
// Restore snapshot
|
||||
|
||||
@@ -12,3 +12,7 @@ func TestMakeHeader(t *testing.T) {
|
||||
_, err := MakeHeader(&types.Header{})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestRandomNodeID(t *testing.T) {
|
||||
assert.NotPanics(t, func() { RandomNodeID() })
|
||||
}
|
||||
|
||||
25
internal/test/factory/p2p.go
Normal file
25
internal/test/factory/p2p.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package factory
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"strings"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func NodeID(str string) types.NodeID {
|
||||
id, err := types.NewNodeID(strings.Repeat(str, 2*types.NodeIDByteLength))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func RandomNodeID() types.NodeID {
|
||||
id, err := types.NewNodeID(hex.EncodeToString(rand.Bytes(types.NodeIDByteLength)))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
@@ -989,7 +989,7 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
|
||||
c.providerMutex.Lock()
|
||||
defer c.providerMutex.Unlock()
|
||||
|
||||
if len(c.witnesses) <= 1 {
|
||||
if len(c.witnesses) < 1 {
|
||||
return nil, ErrNoWitnesses
|
||||
}
|
||||
|
||||
|
||||
@@ -724,51 +724,32 @@ func TestClient_BackwardsVerification(t *testing.T) {
|
||||
|
||||
}
|
||||
{
|
||||
testCases := []struct {
|
||||
headers map[int64]*types.SignedHeader
|
||||
vals map[int64]*types.ValidatorSet
|
||||
}{
|
||||
{
|
||||
// 7) provides incorrect height
|
||||
headers: map[int64]*types.SignedHeader{
|
||||
2: keys.GenSignedHeader(chainID, 1, bTime.Add(30*time.Minute), nil, vals, vals,
|
||||
hash("app_hash"), hash("cons_hash"), hash("results_hash"), 0, len(keys)),
|
||||
3: h3,
|
||||
},
|
||||
vals: valSet,
|
||||
},
|
||||
{
|
||||
// 8) provides incorrect hash
|
||||
headers: map[int64]*types.SignedHeader{
|
||||
2: keys.GenSignedHeader(chainID, 2, bTime.Add(30*time.Minute), nil, vals, vals,
|
||||
hash("app_hash2"), hash("cons_hash23"), hash("results_hash30"), 0, len(keys)),
|
||||
3: h3,
|
||||
},
|
||||
vals: valSet,
|
||||
},
|
||||
// 8) provides incorrect hash
|
||||
headers := map[int64]*types.SignedHeader{
|
||||
2: keys.GenSignedHeader(chainID, 2, bTime.Add(30*time.Minute), nil, vals, vals,
|
||||
hash("app_hash2"), hash("cons_hash23"), hash("results_hash30"), 0, len(keys)),
|
||||
3: h3,
|
||||
}
|
||||
vals := valSet
|
||||
mockNode := mockNodeFromHeadersAndVals(headers, vals)
|
||||
c, err := light.NewClient(
|
||||
ctx,
|
||||
chainID,
|
||||
light.TrustOptions{
|
||||
Period: 1 * time.Hour,
|
||||
Height: 3,
|
||||
Hash: h3.Hash(),
|
||||
},
|
||||
mockNode,
|
||||
[]provider.Provider{mockNode},
|
||||
dbs.New(dbm.NewMemDB()),
|
||||
light.Logger(log.TestingLogger()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
for idx, tc := range testCases {
|
||||
mockNode := mockNodeFromHeadersAndVals(tc.headers, tc.vals)
|
||||
c, err := light.NewClient(
|
||||
ctx,
|
||||
chainID,
|
||||
light.TrustOptions{
|
||||
Period: 1 * time.Hour,
|
||||
Height: 3,
|
||||
Hash: h3.Hash(),
|
||||
},
|
||||
mockNode,
|
||||
[]provider.Provider{mockNode},
|
||||
dbs.New(dbm.NewMemDB()),
|
||||
light.Logger(log.TestingLogger()),
|
||||
)
|
||||
require.NoError(t, err, idx)
|
||||
|
||||
_, err = c.VerifyLightBlockAtHeight(ctx, 2, bTime.Add(1*time.Hour).Add(1*time.Second))
|
||||
assert.Error(t, err, idx)
|
||||
mockNode.AssertExpectations(t)
|
||||
}
|
||||
_, err = c.VerifyLightBlockAtHeight(ctx, 2, bTime.Add(1*time.Hour).Add(1*time.Second))
|
||||
assert.Error(t, err)
|
||||
mockNode.AssertExpectations(t)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
141
node/node.go
141
node/node.go
@@ -28,7 +28,6 @@ import (
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/libs/strings"
|
||||
tmtime "github.com/tendermint/tendermint/libs/time"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
tmgrpc "github.com/tendermint/tendermint/privval/grpc"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
@@ -328,6 +327,8 @@ func makeNode(config *cfg.Config,
|
||||
}
|
||||
|
||||
stateSyncReactor = statesync.NewReactor(
|
||||
genDoc.ChainID,
|
||||
genDoc.InitialHeight,
|
||||
*config.StateSync,
|
||||
stateSyncReactorShim.Logger,
|
||||
proxyApp.Snapshot(),
|
||||
@@ -335,6 +336,7 @@ func makeNode(config *cfg.Config,
|
||||
channels[statesync.SnapshotChannel],
|
||||
channels[statesync.ChunkChannel],
|
||||
channels[statesync.LightBlockChannel],
|
||||
channels[statesync.ParamsChannel],
|
||||
peerUpdates,
|
||||
stateStore,
|
||||
blockStore,
|
||||
@@ -671,6 +673,8 @@ func (n *nodeImpl) OnStart() error {
|
||||
}
|
||||
|
||||
// Run state sync
|
||||
// TODO: We shouldn't run state sync if we already have state that has a
|
||||
// LastBlockHeight that is not InitialHeight
|
||||
if n.stateSync {
|
||||
bcR, ok := n.bcReactor.(cs.BlockSyncReactor)
|
||||
if !ok {
|
||||
@@ -683,17 +687,52 @@ func (n *nodeImpl) OnStart() error {
|
||||
return fmt.Errorf("unable to derive state: %w", err)
|
||||
}
|
||||
|
||||
ssc := n.config.StateSync
|
||||
sp, err := constructStateProvider(ssc, state, n.Logger.With("module", "light"))
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set up light client state provider: %w", err)
|
||||
// TODO: we may want to move these events within the respective
|
||||
// reactors.
|
||||
// At the beginning of the statesync start, we use the initialHeight as the event height
|
||||
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
|
||||
d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
|
||||
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
|
||||
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
|
||||
}
|
||||
|
||||
if err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, sp,
|
||||
ssc, n.config.FastSyncMode, state.InitialHeight, n.eventBus); err != nil {
|
||||
return fmt.Errorf("failed to start state sync: %w", err)
|
||||
}
|
||||
// FIXME: We shouldn't allow state sync to silently error out without
|
||||
// bubbling up the error and gracefully shutting down the rest of the node
|
||||
go func() {
|
||||
n.Logger.Info("starting state sync")
|
||||
state, err := n.stateSyncReactor.Sync(context.TODO())
|
||||
if err != nil {
|
||||
n.Logger.Error("state sync failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
n.consensusReactor.SetStateSyncingMetrics(0)
|
||||
|
||||
d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
|
||||
if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
|
||||
n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
|
||||
}
|
||||
|
||||
// TODO: Some form of orchestrator is needed here between the state
|
||||
// advancing reactors to be able to control which one of the three
|
||||
// is running
|
||||
if n.config.FastSyncMode {
|
||||
// FIXME Very ugly to have these metrics bleed through here.
|
||||
n.consensusReactor.SetBlockSyncingMetrics(1)
|
||||
if err := bcR.SwitchToBlockSync(state); err != nil {
|
||||
n.Logger.Error("failed to switch to block sync", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
|
||||
if err := n.eventBus.PublishEventBlockSyncStatus(d); err != nil {
|
||||
n.eventBus.Logger.Error("failed to emit the block sync starting event", "err", err)
|
||||
}
|
||||
|
||||
} else {
|
||||
n.consensusReactor.SwitchToConsensus(state, true)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -978,67 +1017,6 @@ func (n *nodeImpl) NodeInfo() types.NodeInfo {
|
||||
return n.nodeInfo
|
||||
}
|
||||
|
||||
// startStateSync starts an asynchronous state sync process, then switches to block sync mode.
|
||||
func startStateSync(
|
||||
ssR statesync.SyncReactor,
|
||||
bcR cs.BlockSyncReactor,
|
||||
conR cs.ConsSyncReactor,
|
||||
sp statesync.StateProvider,
|
||||
config *cfg.StateSyncConfig,
|
||||
blockSync bool,
|
||||
stateInitHeight int64,
|
||||
eb *types.EventBus,
|
||||
) error {
|
||||
stateSyncLogger := eb.Logger.With("module", "statesync")
|
||||
|
||||
stateSyncLogger.Info("starting state sync...")
|
||||
|
||||
// at the beginning of the statesync start, we use the initialHeight as the event height
|
||||
// because of the statesync doesn't have the concreate state height before fetched the snapshot.
|
||||
d := types.EventDataStateSyncStatus{Complete: false, Height: stateInitHeight}
|
||||
if err := eb.PublishEventStateSyncStatus(d); err != nil {
|
||||
stateSyncLogger.Error("failed to emit the statesync start event", "err", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
state, err := ssR.Sync(context.TODO(), sp, config.DiscoveryTime)
|
||||
if err != nil {
|
||||
stateSyncLogger.Error("state sync failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := ssR.Backfill(state); err != nil {
|
||||
stateSyncLogger.Error("backfill failed; node has insufficient history to verify all evidence;"+
|
||||
" proceeding optimistically...", "err", err)
|
||||
}
|
||||
|
||||
conR.SetStateSyncingMetrics(0)
|
||||
|
||||
d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
|
||||
if err := eb.PublishEventStateSyncStatus(d); err != nil {
|
||||
stateSyncLogger.Error("failed to emit the statesync start event", "err", err)
|
||||
}
|
||||
|
||||
if blockSync {
|
||||
// FIXME Very ugly to have these metrics bleed through here.
|
||||
conR.SetBlockSyncingMetrics(1)
|
||||
if err := bcR.SwitchToBlockSync(state); err != nil {
|
||||
stateSyncLogger.Error("failed to switch to block sync", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
|
||||
if err := eb.PublishEventBlockSyncStatus(d); err != nil {
|
||||
stateSyncLogger.Error("failed to emit the block sync starting event", "err", err)
|
||||
}
|
||||
|
||||
} else {
|
||||
conR.SwitchToConsensus(state, true)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// genesisDocProvider returns a GenesisDoc.
|
||||
// It allows the GenesisDoc to be pulled from sources other than the
|
||||
// filesystem, for instance from a distributed key-value store cluster.
|
||||
@@ -1221,24 +1199,3 @@ func getChannelsFromShim(reactorShim *p2p.ReactorShim) map[p2p.ChannelID]*p2p.Ch
|
||||
|
||||
return channels
|
||||
}
|
||||
|
||||
func constructStateProvider(
|
||||
ssc *cfg.StateSyncConfig,
|
||||
state sm.State,
|
||||
logger log.Logger,
|
||||
) (statesync.StateProvider, error) {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
to := light.TrustOptions{
|
||||
Period: ssc.TrustPeriod,
|
||||
Height: ssc.TrustHeight,
|
||||
Hash: ssc.TrustHashBytes(),
|
||||
}
|
||||
|
||||
return statesync.NewLightClientStateProvider(
|
||||
ctx,
|
||||
state.ChainID, state.Version, state.InitialHeight,
|
||||
ssc.RPCServers, to, logger,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -21,16 +21,12 @@ import (
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
consmocks "github.com/tendermint/tendermint/internal/consensus/mocks"
|
||||
ssmocks "github.com/tendermint/tendermint/internal/statesync/mocks"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/evidence"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
|
||||
statesync "github.com/tendermint/tendermint/internal/statesync"
|
||||
"github.com/tendermint/tendermint/internal/test/factory"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
tmtime "github.com/tendermint/tendermint/libs/time"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
@@ -669,65 +665,3 @@ func loadStatefromGenesis(t *testing.T) sm.State {
|
||||
|
||||
return state
|
||||
}
|
||||
|
||||
func TestNodeStartStateSync(t *testing.T) {
|
||||
mockSSR := &statesync.MockSyncReactor{}
|
||||
mockFSR := &consmocks.BlockSyncReactor{}
|
||||
mockCSR := &consmocks.ConsSyncReactor{}
|
||||
mockSP := &ssmocks.StateProvider{}
|
||||
state := loadStatefromGenesis(t)
|
||||
config := cfg.ResetTestRoot("load_state_from_genesis")
|
||||
|
||||
eventBus, err := createAndStartEventBus(log.TestingLogger())
|
||||
defer func() {
|
||||
err := eventBus.Stop()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, eventBus)
|
||||
|
||||
sub, err := eventBus.Subscribe(context.Background(), "test-client", types.EventQueryStateSyncStatus, 10)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, sub)
|
||||
|
||||
cfgSS := config.StateSync
|
||||
|
||||
mockSSR.On("Sync", context.TODO(), mockSP, cfgSS.DiscoveryTime).Return(state, nil).
|
||||
On("Backfill", state).Return(nil)
|
||||
mockCSR.On("SetStateSyncingMetrics", float64(0)).Return().
|
||||
On("SwitchToConsensus", state, true).Return()
|
||||
|
||||
require.NoError(t,
|
||||
startStateSync(mockSSR, mockFSR, mockCSR, mockSP, config.StateSync, false, state.InitialHeight, eventBus))
|
||||
|
||||
for cnt := 0; cnt < 2; {
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Errorf("StateSyncStatus timeout")
|
||||
case msg := <-sub.Out():
|
||||
if cnt == 0 {
|
||||
ensureStateSyncStatus(t, msg, false, state.InitialHeight)
|
||||
cnt++
|
||||
} else {
|
||||
// the state height = 0 because we are not actually update the state in this test
|
||||
ensureStateSyncStatus(t, msg, true, 0)
|
||||
cnt++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mockSSR.AssertNumberOfCalls(t, "Sync", 1)
|
||||
mockSSR.AssertNumberOfCalls(t, "Backfill", 1)
|
||||
mockCSR.AssertNumberOfCalls(t, "SetStateSyncingMetrics", 1)
|
||||
mockCSR.AssertNumberOfCalls(t, "SwitchToConsensus", 1)
|
||||
}
|
||||
|
||||
func ensureStateSyncStatus(t *testing.T, msg tmpubsub.Message, complete bool, height int64) {
|
||||
t.Helper()
|
||||
status, ok := msg.Data().(types.EventDataStateSyncStatus)
|
||||
|
||||
require.True(t, ok)
|
||||
require.Equal(t, complete, status.Complete)
|
||||
require.Equal(t, height, status.Height)
|
||||
}
|
||||
|
||||
@@ -706,6 +706,7 @@ func makeNodeInfo(
|
||||
byte(statesync.SnapshotChannel),
|
||||
byte(statesync.ChunkChannel),
|
||||
byte(statesync.LightBlockChannel),
|
||||
byte(statesync.ParamsChannel),
|
||||
},
|
||||
Moniker: config.Moniker,
|
||||
Other: types.NodeInfoOther{
|
||||
|
||||
@@ -28,6 +28,12 @@ func (m *Message) Wrap(pb proto.Message) error {
|
||||
case *LightBlockResponse:
|
||||
m.Sum = &Message_LightBlockResponse{LightBlockResponse: msg}
|
||||
|
||||
case *ParamsRequest:
|
||||
m.Sum = &Message_ParamsRequest{ParamsRequest: msg}
|
||||
|
||||
case *ParamsResponse:
|
||||
m.Sum = &Message_ParamsResponse{ParamsResponse: msg}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
@@ -57,6 +63,12 @@ func (m *Message) Unwrap() (proto.Message, error) {
|
||||
case *Message_LightBlockResponse:
|
||||
return m.GetLightBlockResponse(), nil
|
||||
|
||||
case *Message_ParamsRequest:
|
||||
return m.GetParamsRequest(), nil
|
||||
|
||||
case *Message_ParamsResponse:
|
||||
return m.GetParamsResponse(), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown message: %T", msg)
|
||||
}
|
||||
@@ -106,6 +118,17 @@ func (m *Message) Validate() error {
|
||||
// light block validation handled by the backfill process
|
||||
case *Message_LightBlockResponse:
|
||||
|
||||
case *Message_ParamsRequest:
|
||||
if m.GetParamsRequest().Height == 0 {
|
||||
return errors.New("height cannot be 0")
|
||||
}
|
||||
|
||||
case *Message_ParamsResponse:
|
||||
resp := m.GetParamsResponse()
|
||||
if resp.Height == 0 {
|
||||
return errors.New("height cannot be 0")
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown message type: %T", msg)
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func TestValidateMsg(t *testing.T) {
|
||||
@@ -161,6 +162,35 @@ func TestStateSyncVectors(t *testing.T) {
|
||||
},
|
||||
"2214080110021803220c697427732061206368756e6b",
|
||||
},
|
||||
{
|
||||
"LightBlockRequest",
|
||||
&ssproto.LightBlockRequest{
|
||||
Height: 100,
|
||||
},
|
||||
"2a020864",
|
||||
},
|
||||
{
|
||||
"LightBlockResponse",
|
||||
&ssproto.LightBlockResponse{
|
||||
LightBlock: nil,
|
||||
},
|
||||
"3200",
|
||||
},
|
||||
{
|
||||
"ParamsRequest",
|
||||
&ssproto.ParamsRequest{
|
||||
Height: 9001,
|
||||
},
|
||||
"3a0308a946",
|
||||
},
|
||||
{
|
||||
"ParamsResponse",
|
||||
&ssproto.ParamsResponse{
|
||||
Height: 9001,
|
||||
ConsensusParams: types.DefaultConsensusParams().ToProto(),
|
||||
},
|
||||
"423408a946122f0a10088080c00a10ffffffffffffffffff01120e08a08d0612040880c60a188080401a090a07656432353531392200",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
|
||||
@@ -5,6 +5,7 @@ package statesync
|
||||
|
||||
import (
|
||||
fmt "fmt"
|
||||
_ "github.com/gogo/protobuf/gogoproto"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
types "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
io "io"
|
||||
@@ -31,6 +32,8 @@ type Message struct {
|
||||
// *Message_ChunkResponse
|
||||
// *Message_LightBlockRequest
|
||||
// *Message_LightBlockResponse
|
||||
// *Message_ParamsRequest
|
||||
// *Message_ParamsResponse
|
||||
Sum isMessage_Sum `protobuf_oneof:"sum"`
|
||||
}
|
||||
|
||||
@@ -91,6 +94,12 @@ type Message_LightBlockRequest struct {
|
||||
type Message_LightBlockResponse struct {
|
||||
LightBlockResponse *LightBlockResponse `protobuf:"bytes,6,opt,name=light_block_response,json=lightBlockResponse,proto3,oneof" json:"light_block_response,omitempty"`
|
||||
}
|
||||
type Message_ParamsRequest struct {
|
||||
ParamsRequest *ParamsRequest `protobuf:"bytes,7,opt,name=params_request,json=paramsRequest,proto3,oneof" json:"params_request,omitempty"`
|
||||
}
|
||||
type Message_ParamsResponse struct {
|
||||
ParamsResponse *ParamsResponse `protobuf:"bytes,8,opt,name=params_response,json=paramsResponse,proto3,oneof" json:"params_response,omitempty"`
|
||||
}
|
||||
|
||||
func (*Message_SnapshotsRequest) isMessage_Sum() {}
|
||||
func (*Message_SnapshotsResponse) isMessage_Sum() {}
|
||||
@@ -98,6 +107,8 @@ func (*Message_ChunkRequest) isMessage_Sum() {}
|
||||
func (*Message_ChunkResponse) isMessage_Sum() {}
|
||||
func (*Message_LightBlockRequest) isMessage_Sum() {}
|
||||
func (*Message_LightBlockResponse) isMessage_Sum() {}
|
||||
func (*Message_ParamsRequest) isMessage_Sum() {}
|
||||
func (*Message_ParamsResponse) isMessage_Sum() {}
|
||||
|
||||
func (m *Message) GetSum() isMessage_Sum {
|
||||
if m != nil {
|
||||
@@ -148,6 +159,20 @@ func (m *Message) GetLightBlockResponse() *LightBlockResponse {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Message) GetParamsRequest() *ParamsRequest {
|
||||
if x, ok := m.GetSum().(*Message_ParamsRequest); ok {
|
||||
return x.ParamsRequest
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Message) GetParamsResponse() *ParamsResponse {
|
||||
if x, ok := m.GetSum().(*Message_ParamsResponse); ok {
|
||||
return x.ParamsResponse
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// XXX_OneofWrappers is for the internal use of the proto package.
|
||||
func (*Message) XXX_OneofWrappers() []interface{} {
|
||||
return []interface{}{
|
||||
@@ -157,6 +182,8 @@ func (*Message) XXX_OneofWrappers() []interface{} {
|
||||
(*Message_ChunkResponse)(nil),
|
||||
(*Message_LightBlockRequest)(nil),
|
||||
(*Message_LightBlockResponse)(nil),
|
||||
(*Message_ParamsRequest)(nil),
|
||||
(*Message_ParamsResponse)(nil),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -496,6 +523,102 @@ func (m *LightBlockResponse) GetLightBlock() *types.LightBlock {
|
||||
return nil
|
||||
}
|
||||
|
||||
type ParamsRequest struct {
|
||||
Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"`
|
||||
}
|
||||
|
||||
func (m *ParamsRequest) Reset() { *m = ParamsRequest{} }
|
||||
func (m *ParamsRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*ParamsRequest) ProtoMessage() {}
|
||||
func (*ParamsRequest) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_a1c2869546ca7914, []int{7}
|
||||
}
|
||||
func (m *ParamsRequest) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *ParamsRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_ParamsRequest.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *ParamsRequest) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_ParamsRequest.Merge(m, src)
|
||||
}
|
||||
func (m *ParamsRequest) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *ParamsRequest) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_ParamsRequest.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_ParamsRequest proto.InternalMessageInfo
|
||||
|
||||
func (m *ParamsRequest) GetHeight() uint64 {
|
||||
if m != nil {
|
||||
return m.Height
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type ParamsResponse struct {
|
||||
Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"`
|
||||
ConsensusParams types.ConsensusParams `protobuf:"bytes,2,opt,name=consensus_params,json=consensusParams,proto3" json:"consensus_params"`
|
||||
}
|
||||
|
||||
func (m *ParamsResponse) Reset() { *m = ParamsResponse{} }
|
||||
func (m *ParamsResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*ParamsResponse) ProtoMessage() {}
|
||||
func (*ParamsResponse) Descriptor() ([]byte, []int) {
|
||||
return fileDescriptor_a1c2869546ca7914, []int{8}
|
||||
}
|
||||
func (m *ParamsResponse) XXX_Unmarshal(b []byte) error {
|
||||
return m.Unmarshal(b)
|
||||
}
|
||||
func (m *ParamsResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
|
||||
if deterministic {
|
||||
return xxx_messageInfo_ParamsResponse.Marshal(b, m, deterministic)
|
||||
} else {
|
||||
b = b[:cap(b)]
|
||||
n, err := m.MarshalToSizedBuffer(b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b[:n], nil
|
||||
}
|
||||
}
|
||||
func (m *ParamsResponse) XXX_Merge(src proto.Message) {
|
||||
xxx_messageInfo_ParamsResponse.Merge(m, src)
|
||||
}
|
||||
func (m *ParamsResponse) XXX_Size() int {
|
||||
return m.Size()
|
||||
}
|
||||
func (m *ParamsResponse) XXX_DiscardUnknown() {
|
||||
xxx_messageInfo_ParamsResponse.DiscardUnknown(m)
|
||||
}
|
||||
|
||||
var xxx_messageInfo_ParamsResponse proto.InternalMessageInfo
|
||||
|
||||
func (m *ParamsResponse) GetHeight() uint64 {
|
||||
if m != nil {
|
||||
return m.Height
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ParamsResponse) GetConsensusParams() types.ConsensusParams {
|
||||
if m != nil {
|
||||
return m.ConsensusParams
|
||||
}
|
||||
return types.ConsensusParams{}
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*Message)(nil), "tendermint.statesync.Message")
|
||||
proto.RegisterType((*SnapshotsRequest)(nil), "tendermint.statesync.SnapshotsRequest")
|
||||
@@ -504,43 +627,51 @@ func init() {
|
||||
proto.RegisterType((*ChunkResponse)(nil), "tendermint.statesync.ChunkResponse")
|
||||
proto.RegisterType((*LightBlockRequest)(nil), "tendermint.statesync.LightBlockRequest")
|
||||
proto.RegisterType((*LightBlockResponse)(nil), "tendermint.statesync.LightBlockResponse")
|
||||
proto.RegisterType((*ParamsRequest)(nil), "tendermint.statesync.ParamsRequest")
|
||||
proto.RegisterType((*ParamsResponse)(nil), "tendermint.statesync.ParamsResponse")
|
||||
}
|
||||
|
||||
func init() { proto.RegisterFile("tendermint/statesync/types.proto", fileDescriptor_a1c2869546ca7914) }
|
||||
|
||||
var fileDescriptor_a1c2869546ca7914 = []byte{
|
||||
// 485 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x51, 0x6b, 0xd3, 0x50,
|
||||
0x14, 0x4e, 0x5c, 0xdb, 0x8d, 0xb3, 0x46, 0x96, 0x63, 0x91, 0x32, 0x46, 0x18, 0x11, 0x74, 0x20,
|
||||
0xa4, 0xa0, 0x8f, 0xe2, 0x4b, 0x7d, 0x99, 0x30, 0x5f, 0xee, 0x1c, 0xa8, 0x08, 0x23, 0x4d, 0xaf,
|
||||
0x4d, 0xb0, 0x49, 0x6a, 0xcf, 0x2d, 0xb8, 0x1f, 0xe0, 0x93, 0x2f, 0x82, 0x7f, 0xca, 0xc7, 0x3d,
|
||||
0xfa, 0x28, 0xed, 0x1f, 0x91, 0x9c, 0xdc, 0x26, 0x77, 0x6d, 0x5d, 0x11, 0xf6, 0x96, 0xef, 0xeb,
|
||||
0x77, 0x3e, 0xbe, 0x73, 0xcf, 0xe9, 0x81, 0x63, 0x25, 0xb3, 0xa1, 0x9c, 0xa6, 0x49, 0xa6, 0x7a,
|
||||
0xa4, 0x42, 0x25, 0xe9, 0x2a, 0x8b, 0x7a, 0xea, 0x6a, 0x22, 0x29, 0x98, 0x4c, 0x73, 0x95, 0x63,
|
||||
0xa7, 0x56, 0x04, 0x95, 0xe2, 0xf0, 0xc8, 0xa8, 0x63, 0xb5, 0x59, 0xe3, 0xff, 0x6c, 0xc0, 0xee,
|
||||
0x1b, 0x49, 0x14, 0x8e, 0x24, 0x5e, 0x80, 0x4b, 0x59, 0x38, 0xa1, 0x38, 0x57, 0x74, 0x39, 0x95,
|
||||
0x5f, 0x66, 0x92, 0x54, 0xd7, 0x3e, 0xb6, 0x4f, 0xf6, 0x9f, 0x3d, 0x0e, 0x36, 0x79, 0x07, 0xe7,
|
||||
0x4b, 0xb9, 0x28, 0xd5, 0xa7, 0x96, 0x38, 0xa0, 0x15, 0x0e, 0xdf, 0x01, 0x9a, 0xb6, 0x34, 0xc9,
|
||||
0x33, 0x92, 0xdd, 0x7b, 0xec, 0xfb, 0x64, 0xab, 0x6f, 0x29, 0x3f, 0xb5, 0x84, 0x4b, 0xab, 0x24,
|
||||
0xbe, 0x06, 0x27, 0x8a, 0x67, 0xd9, 0xe7, 0x2a, 0xec, 0x0e, 0x9b, 0xfa, 0x9b, 0x4d, 0x5f, 0x15,
|
||||
0xd2, 0x3a, 0x68, 0x3b, 0x32, 0x30, 0x9e, 0xc1, 0xfd, 0xa5, 0x95, 0x0e, 0xd8, 0x60, 0xaf, 0x47,
|
||||
0xb7, 0x7a, 0x55, 0xe1, 0x9c, 0xc8, 0x24, 0xf0, 0x3d, 0x3c, 0x18, 0x27, 0xa3, 0x58, 0x5d, 0x0e,
|
||||
0xc6, 0x79, 0x54, 0xc7, 0x6b, 0xde, 0xd6, 0xf3, 0x59, 0x51, 0xd0, 0x2f, 0xf4, 0x75, 0x46, 0x77,
|
||||
0xbc, 0x4a, 0xe2, 0x47, 0xe8, 0xdc, 0xb4, 0xd6, 0x71, 0x5b, 0xec, 0x7d, 0xb2, 0xdd, 0xbb, 0xca,
|
||||
0x8c, 0xe3, 0x35, 0xb6, 0xdf, 0x84, 0x1d, 0x9a, 0xa5, 0x3e, 0xc2, 0xc1, 0xea, 0x68, 0xfd, 0xef,
|
||||
0x36, 0xb8, 0x6b, 0x73, 0xc1, 0x87, 0xd0, 0x8a, 0x65, 0xe1, 0xc3, 0x8b, 0xd2, 0x10, 0x1a, 0x15,
|
||||
0xfc, 0xa7, 0x7c, 0x9a, 0x86, 0x8a, 0x07, 0xed, 0x08, 0x8d, 0x0a, 0x9e, 0x9f, 0x8a, 0x78, 0x56,
|
||||
0x8e, 0xd0, 0x08, 0x11, 0x1a, 0x71, 0x48, 0x31, 0xbf, 0x7a, 0x5b, 0xf0, 0x37, 0x1e, 0xc2, 0x5e,
|
||||
0x2a, 0x55, 0x38, 0x0c, 0x55, 0xc8, 0x4f, 0xd7, 0x16, 0x15, 0xf6, 0xdf, 0x42, 0xdb, 0x9c, 0xe7,
|
||||
0x7f, 0xe7, 0xe8, 0x40, 0x33, 0xc9, 0x86, 0xf2, 0xab, 0x8e, 0x51, 0x02, 0xff, 0x9b, 0x0d, 0xce,
|
||||
0x8d, 0xd1, 0xde, 0x8d, 0x6f, 0xc1, 0x72, 0x9f, 0xba, 0xbd, 0x12, 0x60, 0x17, 0x76, 0xd3, 0x84,
|
||||
0x28, 0xc9, 0x46, 0xdc, 0xde, 0x9e, 0x58, 0x42, 0xff, 0x29, 0xb8, 0x6b, 0xeb, 0xf0, 0xaf, 0x28,
|
||||
0xfe, 0x39, 0xe0, 0xfa, 0x7c, 0xf1, 0x25, 0xec, 0x1b, 0x7b, 0xa2, 0xff, 0xc6, 0x47, 0xe6, 0x7a,
|
||||
0x94, 0x67, 0xc0, 0x28, 0x85, 0x7a, 0x21, 0xfa, 0x17, 0xbf, 0xe6, 0x9e, 0x7d, 0x3d, 0xf7, 0xec,
|
||||
0x3f, 0x73, 0xcf, 0xfe, 0xb1, 0xf0, 0xac, 0xeb, 0x85, 0x67, 0xfd, 0x5e, 0x78, 0xd6, 0x87, 0x17,
|
||||
0xa3, 0x44, 0xc5, 0xb3, 0x41, 0x10, 0xe5, 0x69, 0xcf, 0x3c, 0x2d, 0xf5, 0x27, 0x5f, 0x96, 0xde,
|
||||
0xa6, 0x73, 0x35, 0x68, 0xf1, 0x6f, 0xcf, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xc1, 0x45, 0x35,
|
||||
0xee, 0xcd, 0x04, 0x00, 0x00,
|
||||
// 589 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x95, 0x4f, 0x8b, 0xd3, 0x40,
|
||||
0x18, 0xc6, 0x13, 0xb7, 0xdd, 0x96, 0x77, 0x9b, 0x6e, 0x3b, 0x16, 0x29, 0x65, 0x8d, 0x6b, 0x14,
|
||||
0x77, 0x41, 0x68, 0x41, 0x8f, 0xe2, 0xa5, 0x7b, 0x59, 0x61, 0x45, 0x99, 0x75, 0x41, 0x45, 0x28,
|
||||
0x69, 0x3a, 0x26, 0xc1, 0xe6, 0x8f, 0x7d, 0xa7, 0xe0, 0x82, 0x57, 0x4f, 0x5e, 0xfc, 0x2c, 0x7e,
|
||||
0x8a, 0x3d, 0xee, 0xd1, 0x93, 0x48, 0xfb, 0x45, 0x24, 0x93, 0x69, 0x32, 0x6d, 0xda, 0x2e, 0x82,
|
||||
0xb7, 0xbc, 0xcf, 0x3c, 0xf9, 0xf5, 0x99, 0xc9, 0xc3, 0x14, 0x0e, 0x39, 0x0b, 0x47, 0x6c, 0x12,
|
||||
0xf8, 0x21, 0xef, 0x21, 0xb7, 0x39, 0xc3, 0xcb, 0xd0, 0xe9, 0xf1, 0xcb, 0x98, 0x61, 0x37, 0x9e,
|
||||
0x44, 0x3c, 0x22, 0xad, 0xdc, 0xd1, 0xcd, 0x1c, 0x9d, 0x96, 0x1b, 0xb9, 0x91, 0x30, 0xf4, 0x92,
|
||||
0xa7, 0xd4, 0xdb, 0x39, 0x50, 0x68, 0x82, 0xa1, 0x92, 0x3a, 0x77, 0x0b, 0xab, 0xb1, 0x3d, 0xb1,
|
||||
0x03, 0xb9, 0x6c, 0xfd, 0x2c, 0x43, 0xe5, 0x25, 0x43, 0xb4, 0x5d, 0x46, 0x2e, 0xa0, 0x89, 0xa1,
|
||||
0x1d, 0xa3, 0x17, 0x71, 0x1c, 0x4c, 0xd8, 0xe7, 0x29, 0x43, 0xde, 0xd6, 0x0f, 0xf5, 0xe3, 0xbd,
|
||||
0x27, 0x8f, 0xba, 0xeb, 0x02, 0x75, 0xcf, 0x17, 0x76, 0x9a, 0xba, 0x4f, 0x35, 0xda, 0xc0, 0x15,
|
||||
0x8d, 0xbc, 0x05, 0xa2, 0x62, 0x31, 0x8e, 0x42, 0x64, 0xed, 0x5b, 0x82, 0x7b, 0x74, 0x23, 0x37,
|
||||
0xb5, 0x9f, 0x6a, 0xb4, 0x89, 0xab, 0x22, 0x79, 0x01, 0x86, 0xe3, 0x4d, 0xc3, 0x4f, 0x59, 0xd8,
|
||||
0x1d, 0x01, 0xb5, 0xd6, 0x43, 0x4f, 0x12, 0x6b, 0x1e, 0xb4, 0xe6, 0x28, 0x33, 0x39, 0x83, 0xfa,
|
||||
0x02, 0x25, 0x03, 0x96, 0x04, 0xeb, 0xc1, 0x56, 0x56, 0x16, 0xce, 0x70, 0x54, 0x81, 0xbc, 0x83,
|
||||
0xdb, 0x63, 0xdf, 0xf5, 0xf8, 0x60, 0x38, 0x8e, 0x9c, 0x3c, 0x5e, 0x79, 0xdb, 0x9e, 0xcf, 0x92,
|
||||
0x17, 0xfa, 0x89, 0x3f, 0xcf, 0xd8, 0x1c, 0xaf, 0x8a, 0xe4, 0x03, 0xb4, 0x96, 0xd1, 0x32, 0xee,
|
||||
0xae, 0x60, 0x1f, 0xdf, 0xcc, 0xce, 0x32, 0x93, 0x71, 0x41, 0x4d, 0x8e, 0x21, 0xad, 0x47, 0x96,
|
||||
0xb9, 0xb2, 0xed, 0x18, 0x5e, 0x0b, 0x6f, 0x9e, 0xd7, 0x88, 0x55, 0x81, 0xbc, 0x82, 0xfd, 0x8c,
|
||||
0x26, 0x63, 0x56, 0x05, 0xee, 0xe1, 0x76, 0x5c, 0x16, 0xb1, 0x1e, 0x2f, 0x29, 0xfd, 0x32, 0xec,
|
||||
0xe0, 0x34, 0xb0, 0x08, 0x34, 0x56, 0x9b, 0x67, 0x7d, 0xd7, 0xa1, 0x59, 0xa8, 0x0d, 0xb9, 0x03,
|
||||
0xbb, 0x1e, 0x4b, 0xb6, 0x29, 0x7a, 0x5c, 0xa2, 0x72, 0x4a, 0xf4, 0x8f, 0xd1, 0x24, 0xb0, 0xb9,
|
||||
0xe8, 0xa1, 0x41, 0xe5, 0x94, 0xe8, 0xe2, 0x4b, 0xa2, 0xa8, 0x92, 0x41, 0xe5, 0x44, 0x08, 0x94,
|
||||
0x3c, 0x1b, 0x3d, 0x51, 0x8a, 0x1a, 0x15, 0xcf, 0xa4, 0x03, 0xd5, 0x80, 0x71, 0x7b, 0x64, 0x73,
|
||||
0x5b, 0x7c, 0xd9, 0x1a, 0xcd, 0x66, 0xeb, 0x0d, 0xd4, 0xd4, 0xba, 0xfd, 0x73, 0x8e, 0x16, 0x94,
|
||||
0xfd, 0x70, 0xc4, 0xbe, 0xc8, 0x18, 0xe9, 0x60, 0x7d, 0xd3, 0xc1, 0x58, 0x6a, 0xde, 0xff, 0xe1,
|
||||
0x26, 0xaa, 0xd8, 0xa7, 0xdc, 0x5e, 0x3a, 0x90, 0x36, 0x54, 0x02, 0x1f, 0xd1, 0x0f, 0x5d, 0xb1,
|
||||
0xbd, 0x2a, 0x5d, 0x8c, 0xd6, 0x63, 0x68, 0x16, 0xda, 0xba, 0x29, 0x8a, 0x75, 0x0e, 0xa4, 0x58,
|
||||
0x3f, 0xf2, 0x1c, 0xf6, 0x94, 0x1a, 0xcb, 0x5b, 0xe6, 0x40, 0xad, 0x45, 0x7a, 0x89, 0x29, 0xaf,
|
||||
0x42, 0xde, 0x57, 0xeb, 0x08, 0x8c, 0xa5, 0xee, 0x6d, 0xfc, 0xf5, 0xaf, 0x50, 0x5f, 0x6e, 0xd5,
|
||||
0xc6, 0x23, 0xa3, 0xd0, 0x70, 0x12, 0x43, 0x88, 0x53, 0x1c, 0xa4, 0xbd, 0x93, 0x97, 0xd4, 0xfd,
|
||||
0x62, 0xac, 0x93, 0x85, 0x33, 0x85, 0xf7, 0x4b, 0x57, 0xbf, 0xef, 0x69, 0x74, 0xdf, 0x59, 0x91,
|
||||
0x2f, 0xae, 0x66, 0xa6, 0x7e, 0x3d, 0x33, 0xf5, 0x3f, 0x33, 0x53, 0xff, 0x31, 0x37, 0xb5, 0xeb,
|
||||
0xb9, 0xa9, 0xfd, 0x9a, 0x9b, 0xda, 0xfb, 0x67, 0xae, 0xcf, 0xbd, 0xe9, 0xb0, 0xeb, 0x44, 0x41,
|
||||
0x4f, 0xbd, 0xa1, 0xf3, 0xc7, 0xf4, 0x9e, 0x5f, 0xf7, 0x4f, 0x31, 0xdc, 0x15, 0x6b, 0x4f, 0xff,
|
||||
0x06, 0x00, 0x00, 0xff, 0xff, 0xa1, 0xb2, 0xfd, 0x65, 0x48, 0x06, 0x00, 0x00,
|
||||
}
|
||||
|
||||
func (m *Message) Marshal() (dAtA []byte, err error) {
|
||||
@@ -701,6 +832,48 @@ func (m *Message_LightBlockResponse) MarshalToSizedBuffer(dAtA []byte) (int, err
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *Message_ParamsRequest) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Message_ParamsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.ParamsRequest != nil {
|
||||
{
|
||||
size, err := m.ParamsRequest.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintTypes(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0x3a
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *Message_ParamsResponse) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *Message_ParamsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
if m.ParamsResponse != nil {
|
||||
{
|
||||
size, err := m.ParamsResponse.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintTypes(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0x42
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
func (m *SnapshotsRequest) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
@@ -932,6 +1105,72 @@ func (m *LightBlockResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *ParamsRequest) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *ParamsRequest) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ParamsRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if m.Height != 0 {
|
||||
i = encodeVarintTypes(dAtA, i, uint64(m.Height))
|
||||
i--
|
||||
dAtA[i] = 0x8
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func (m *ParamsResponse) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *ParamsResponse) MarshalTo(dAtA []byte) (int, error) {
|
||||
size := m.Size()
|
||||
return m.MarshalToSizedBuffer(dAtA[:size])
|
||||
}
|
||||
|
||||
func (m *ParamsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
||||
i := len(dAtA)
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
{
|
||||
size, err := m.ConsensusParams.MarshalToSizedBuffer(dAtA[:i])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
i -= size
|
||||
i = encodeVarintTypes(dAtA, i, uint64(size))
|
||||
}
|
||||
i--
|
||||
dAtA[i] = 0x12
|
||||
if m.Height != 0 {
|
||||
i = encodeVarintTypes(dAtA, i, uint64(m.Height))
|
||||
i--
|
||||
dAtA[i] = 0x8
|
||||
}
|
||||
return len(dAtA) - i, nil
|
||||
}
|
||||
|
||||
func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
|
||||
offset -= sovTypes(v)
|
||||
base := offset
|
||||
@@ -1027,6 +1266,30 @@ func (m *Message_LightBlockResponse) Size() (n int) {
|
||||
}
|
||||
return n
|
||||
}
|
||||
func (m *Message_ParamsRequest) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.ParamsRequest != nil {
|
||||
l = m.ParamsRequest.Size()
|
||||
n += 1 + l + sovTypes(uint64(l))
|
||||
}
|
||||
return n
|
||||
}
|
||||
func (m *Message_ParamsResponse) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.ParamsResponse != nil {
|
||||
l = m.ParamsResponse.Size()
|
||||
n += 1 + l + sovTypes(uint64(l))
|
||||
}
|
||||
return n
|
||||
}
|
||||
func (m *SnapshotsRequest) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
@@ -1130,6 +1393,32 @@ func (m *LightBlockResponse) Size() (n int) {
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *ParamsRequest) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.Height != 0 {
|
||||
n += 1 + sovTypes(uint64(m.Height))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (m *ParamsResponse) Size() (n int) {
|
||||
if m == nil {
|
||||
return 0
|
||||
}
|
||||
var l int
|
||||
_ = l
|
||||
if m.Height != 0 {
|
||||
n += 1 + sovTypes(uint64(m.Height))
|
||||
}
|
||||
l = m.ConsensusParams.Size()
|
||||
n += 1 + l + sovTypes(uint64(l))
|
||||
return n
|
||||
}
|
||||
|
||||
func sovTypes(x uint64) (n int) {
|
||||
return (math_bits.Len64(x|1) + 6) / 7
|
||||
}
|
||||
@@ -1375,6 +1664,76 @@ func (m *Message) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
m.Sum = &Message_LightBlockResponse{v}
|
||||
iNdEx = postIndex
|
||||
case 7:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field ParamsRequest", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowTypes
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthTypes
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthTypes
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
v := &ParamsRequest{}
|
||||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Sum = &Message_ParamsRequest{v}
|
||||
iNdEx = postIndex
|
||||
case 8:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field ParamsResponse", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowTypes
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthTypes
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthTypes
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
v := &ParamsResponse{}
|
||||
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
m.Sum = &Message_ParamsResponse{v}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipTypes(dAtA[iNdEx:])
|
||||
@@ -2044,6 +2403,177 @@ func (m *LightBlockResponse) Unmarshal(dAtA []byte) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *ParamsRequest) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowTypes
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: ParamsRequest: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: ParamsRequest: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType)
|
||||
}
|
||||
m.Height = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowTypes
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.Height |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipTypes(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthTypes
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (m *ParamsResponse) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowTypes
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: ParamsResponse: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: ParamsResponse: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 0 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType)
|
||||
}
|
||||
m.Height = 0
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowTypes
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
m.Height |= uint64(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
case 2:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field ConsensusParams", wireType)
|
||||
}
|
||||
var msglen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowTypes
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
msglen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if msglen < 0 {
|
||||
return ErrInvalidLengthTypes
|
||||
}
|
||||
postIndex := iNdEx + msglen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthTypes
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
if err := m.ConsensusParams.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipTypes(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if (skippy < 0) || (iNdEx+skippy) < 0 {
|
||||
return ErrInvalidLengthTypes
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func skipTypes(dAtA []byte) (n int, err error) {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
syntax = "proto3";
|
||||
package tendermint.statesync;
|
||||
|
||||
import "gogoproto/gogo.proto";
|
||||
import "tendermint/types/types.proto";
|
||||
import "tendermint/types/params.proto";
|
||||
|
||||
option go_package = "github.com/tendermint/tendermint/proto/tendermint/statesync";
|
||||
|
||||
@@ -13,6 +15,8 @@ message Message {
|
||||
ChunkResponse chunk_response = 4;
|
||||
LightBlockRequest light_block_request = 5;
|
||||
LightBlockResponse light_block_response = 6;
|
||||
ParamsRequest params_request = 7;
|
||||
ParamsResponse params_response = 8;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,4 +50,13 @@ message LightBlockRequest {
|
||||
|
||||
message LightBlockResponse {
|
||||
tendermint.types.LightBlock light_block = 1;
|
||||
}
|
||||
|
||||
message ParamsRequest {
|
||||
uint64 height = 1;
|
||||
}
|
||||
|
||||
message ParamsResponse {
|
||||
uint64 height = 1;
|
||||
tendermint.types.ConsensusParams consensus_params = 2 [(gogoproto.nullable) = false];
|
||||
}
|
||||
@@ -38,7 +38,7 @@ var (
|
||||
// FIXME: v2 disabled due to flake
|
||||
nodeBlockSyncs = uniformChoice{"v0"} // "v2"
|
||||
nodeMempools = uniformChoice{"v0", "v1"}
|
||||
nodeStateSyncs = uniformChoice{false, true}
|
||||
nodeStateSyncs = uniformChoice{e2e.StateSyncDisabled, e2e.StateSyncP2P, e2e.StateSyncRPC}
|
||||
nodePersistIntervals = uniformChoice{0, 1, 5}
|
||||
nodeSnapshotIntervals = uniformChoice{0, 3}
|
||||
nodeRetainBlocks = uniformChoice{0, int(e2e.EvidenceAgeHeight), int(e2e.EvidenceAgeHeight) + 5}
|
||||
@@ -281,13 +281,17 @@ func generateNode(
|
||||
PrivvalProtocol: nodePrivvalProtocols.Choose(r).(string),
|
||||
BlockSync: nodeBlockSyncs.Choose(r).(string),
|
||||
Mempool: nodeMempools.Choose(r).(string),
|
||||
StateSync: nodeStateSyncs.Choose(r).(bool) && startAt > 0,
|
||||
StateSync: e2e.StateSyncDisabled,
|
||||
PersistInterval: ptrUint64(uint64(nodePersistIntervals.Choose(r).(int))),
|
||||
SnapshotInterval: uint64(nodeSnapshotIntervals.Choose(r).(int)),
|
||||
RetainBlocks: uint64(nodeRetainBlocks.Choose(r).(int)),
|
||||
Perturb: nodePerturbations.Choose(r),
|
||||
}
|
||||
|
||||
if startAt > 0 {
|
||||
node.StateSync = nodeStateSyncs.Choose(r).(string)
|
||||
}
|
||||
|
||||
// If this node is forced to be an archive node, retain all blocks and
|
||||
// enable state sync snapshotting.
|
||||
if forceArchive {
|
||||
@@ -316,7 +320,7 @@ func generateNode(
|
||||
}
|
||||
}
|
||||
|
||||
if node.StateSync {
|
||||
if node.StateSync != e2e.StateSyncDisabled {
|
||||
node.BlockSync = "v0"
|
||||
}
|
||||
|
||||
|
||||
@@ -55,6 +55,7 @@ retain_blocks = 7
|
||||
|
||||
[node.validator04]
|
||||
abci_protocol = "builtin"
|
||||
snapshot_interval = 5
|
||||
database = "rocksdb"
|
||||
persistent_peers = ["validator01"]
|
||||
perturb = ["pause"]
|
||||
@@ -62,6 +63,7 @@ perturb = ["pause"]
|
||||
[node.validator05]
|
||||
database = "cleveldb"
|
||||
block_sync = "v0"
|
||||
state_sync = "p2p"
|
||||
seeds = ["seed01"]
|
||||
start_at = 1005 # Becomes part of the validator set at 1010
|
||||
abci_protocol = "grpc"
|
||||
@@ -73,10 +75,10 @@ mode = "full"
|
||||
start_at = 1010
|
||||
# FIXME: should be v2, disabled due to flake
|
||||
block_sync = "v0"
|
||||
persistent_peers = ["validator01", "validator02", "validator03", "validator04", "validator05"]
|
||||
persistent_peers = ["validator01", "validator02", "validator03", "validator04"]
|
||||
perturb = ["restart"]
|
||||
retain_blocks = 7
|
||||
state_sync = true
|
||||
state_sync = "rpc"
|
||||
|
||||
[node.light01]
|
||||
mode = "light"
|
||||
|
||||
@@ -117,7 +117,8 @@ type ManifestNode struct {
|
||||
// block hashes and RPC servers. At least one node in the network must have
|
||||
// SnapshotInterval set to non-zero, and the state syncing node must have
|
||||
// StartAt set to an appropriate height where a snapshot is available.
|
||||
StateSync bool `toml:"state_sync"`
|
||||
// StateSync can either be "p2p" or "rpc" or an empty string to disable
|
||||
StateSync string `toml:"state_sync"`
|
||||
|
||||
// PersistInterval specifies the height interval at which the application
|
||||
// will persist state to disk. Defaults to 1 (every height), setting this to
|
||||
|
||||
@@ -50,6 +50,10 @@ const (
|
||||
|
||||
EvidenceAgeHeight int64 = 7
|
||||
EvidenceAgeTime time.Duration = 500 * time.Millisecond
|
||||
|
||||
StateSyncP2P = "p2p"
|
||||
StateSyncRPC = "rpc"
|
||||
StateSyncDisabled = ""
|
||||
)
|
||||
|
||||
// Testnet represents a single testnet.
|
||||
@@ -81,7 +85,7 @@ type Node struct {
|
||||
StartAt int64
|
||||
BlockSync string
|
||||
Mempool string
|
||||
StateSync bool
|
||||
StateSync string
|
||||
Database string
|
||||
ABCIProtocol Protocol
|
||||
PrivvalProtocol Protocol
|
||||
@@ -333,6 +337,11 @@ func (n Node) Validate(testnet Testnet) error {
|
||||
default:
|
||||
return fmt.Errorf("invalid block sync setting %q", n.BlockSync)
|
||||
}
|
||||
switch n.StateSync {
|
||||
case StateSyncDisabled, StateSyncP2P, StateSyncRPC:
|
||||
default:
|
||||
return fmt.Errorf("invalid state sync setting %q", n.StateSync)
|
||||
}
|
||||
switch n.Mempool {
|
||||
case "", "v0", "v1":
|
||||
default:
|
||||
@@ -366,7 +375,7 @@ func (n Node) Validate(testnet Testnet) error {
|
||||
return fmt.Errorf("cannot start at height %v lower than initial height %v",
|
||||
n.StartAt, n.Testnet.InitialHeight)
|
||||
}
|
||||
if n.StateSync && n.StartAt == 0 {
|
||||
if n.StateSync != StateSyncDisabled && n.StartAt == 0 {
|
||||
return errors.New("state synced nodes cannot start at the initial height")
|
||||
}
|
||||
if n.RetainBlocks != 0 && n.RetainBlocks < uint64(EvidenceAgeHeight) {
|
||||
|
||||
@@ -168,12 +168,21 @@ func NewCLI() *CLI {
|
||||
},
|
||||
})
|
||||
|
||||
cli.root.AddCommand(&cobra.Command{
|
||||
Use: "pause",
|
||||
Short: "Pauses the Docker testnet",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
logger.Info("Pausing testnet")
|
||||
return execCompose(cli.testnet.Dir, "pause")
|
||||
},
|
||||
})
|
||||
|
||||
cli.root.AddCommand(&cobra.Command{
|
||||
Use: "resume",
|
||||
Short: "Resumes the Docker testnet",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
logger.Info("Resuming testnet")
|
||||
return execCompose(cli.testnet.Dir, "up")
|
||||
return execCompose(cli.testnet.Dir, "unpause")
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@@ -302,10 +302,13 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
|
||||
cfg.BlockSync.Version = node.BlockSync
|
||||
}
|
||||
|
||||
if node.StateSync {
|
||||
switch node.StateSync {
|
||||
case e2e.StateSyncP2P:
|
||||
cfg.StateSync.Enable = true
|
||||
cfg.StateSync.UseP2P = true
|
||||
case e2e.StateSyncRPC:
|
||||
cfg.StateSync.Enable = true
|
||||
cfg.StateSync.RPCServers = []string{}
|
||||
|
||||
for _, peer := range node.Testnet.ArchiveNodes() {
|
||||
if peer.Name == node.Name {
|
||||
continue
|
||||
|
||||
@@ -66,7 +66,7 @@ func Start(testnet *e2e.Testnet) error {
|
||||
|
||||
// Update any state sync nodes with a trusted height and hash
|
||||
for _, node := range nodeQueue {
|
||||
if node.StateSync || node.Mode == e2e.ModeLight {
|
||||
if node.StateSync != e2e.StateSyncDisabled || node.Mode == e2e.ModeLight {
|
||||
err = UpdateConfigStateSync(node, block.Height, blockID.Hash.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -85,17 +85,11 @@ func Start(testnet *e2e.Testnet) error {
|
||||
|
||||
networkHeight = node.StartAt
|
||||
|
||||
logger.Info("Waiting for network to advance before starting catch up node",
|
||||
"node", node.Name,
|
||||
"height", networkHeight)
|
||||
|
||||
if _, _, err := waitForHeight(testnet, networkHeight); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("Starting catch up node", "node", node.Name, "height", node.StartAt)
|
||||
|
||||
if err := execCompose(testnet.Dir, "up", "-d", node.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestBlock_Header(t *testing.T) {
|
||||
}
|
||||
// the first blocks after state sync come from the backfill process
|
||||
// and are therefore not complete
|
||||
if node.StateSync && block.Header.Height <= first+e2e.EvidenceAgeHeight+1 {
|
||||
if node.StateSync != e2e.StateSyncDisabled && block.Header.Height <= first+e2e.EvidenceAgeHeight+1 {
|
||||
continue
|
||||
}
|
||||
if block.Header.Height > last {
|
||||
@@ -70,7 +70,7 @@ func TestBlock_Range(t *testing.T) {
|
||||
switch {
|
||||
// if the node state synced we ignore any assertions because it's hard to know how far back
|
||||
// the node ran reverse sync for
|
||||
case node.StateSync:
|
||||
case node.StateSync != e2e.StateSyncDisabled:
|
||||
break
|
||||
case node.RetainBlocks > 0 && int64(node.RetainBlocks) < (last-node.Testnet.InitialHeight+1):
|
||||
// Delta handles race conditions in reading first/last heights.
|
||||
@@ -83,7 +83,7 @@ func TestBlock_Range(t *testing.T) {
|
||||
}
|
||||
|
||||
for h := first; h <= last; h++ {
|
||||
if node.StateSync && h <= first+e2e.EvidenceAgeHeight+1 {
|
||||
if node.StateSync != e2e.StateSyncDisabled && h <= first+e2e.EvidenceAgeHeight+1 {
|
||||
continue
|
||||
}
|
||||
resp, err := client.Block(ctx, &(h))
|
||||
|
||||
Reference in New Issue
Block a user