From 6f6ac5c04e6ea15d1257ef380baaf3ba19020df3 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 8 Jun 2021 19:23:52 +0200 Subject: [PATCH] state sync: reverse sync implementation (#6463) --- CHANGELOG_PENDING.md | 3 +- internal/statesync/block_queue.go | 263 ++++++++++ internal/statesync/block_queue_test.go | 241 +++++++++ internal/statesync/dispatcher.go | 322 ++++++++++++ internal/statesync/dispatcher_test.go | 179 +++++++ internal/statesync/mocks/state_provider.go | 2 +- internal/statesync/reactor.go | 426 +++++++++++++--- internal/statesync/reactor_test.go | 299 ++++++++++- internal/statesync/stateprovider.go | 37 +- internal/test/factory/block.go | 6 + node/node.go | 16 +- node/setup.go | 1 + proto/tendermint/abci/types.proto | 2 +- proto/tendermint/p2p/pex.proto | 1 - proto/tendermint/statesync/message.go | 20 + proto/tendermint/statesync/types.pb.go | 562 +++++++++++++++++++-- proto/tendermint/statesync/types.proto | 20 +- proxy/mocks/app_conn_consensus.go | 2 +- proxy/mocks/app_conn_mempool.go | 2 +- proxy/mocks/app_conn_query.go | 2 +- proxy/mocks/app_conn_snapshot.go | 2 +- rpc/core/blocks.go | 4 +- state/mocks/evidence_pool.go | 2 +- state/mocks/store.go | 16 +- state/store.go | 27 +- store/store.go | 42 ++ test/e2e/networks/simple.toml | 3 +- test/e2e/pkg/testnet.go | 2 +- test/e2e/runner/load.go | 2 +- test/e2e/tests/block_test.go | 15 +- types/block_meta_test.go | 4 +- types/block_test.go | 7 +- types/light_test.go | 4 +- 33 files changed, 2395 insertions(+), 141 deletions(-) create mode 100644 internal/statesync/block_queue.go create mode 100644 internal/statesync/block_queue_test.go create mode 100644 internal/statesync/dispatcher.go create mode 100644 internal/statesync/dispatcher_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 2ca8a29d8..2ce47eb6b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -125,4 +125,5 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi - [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash) - [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes) - [blockchain/v1] \#5711 Fix deadlock (@melekes) -- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (cmwaters) +- [evidence] \#6375 Fix bug with inconsistent LightClientAttackEvidence hashing (@cmwaters) +- [statesync] \#6463 Adds Reverse Sync feature to fetch historical light blocks after state sync in order to verify any evidence (@cmwaters) diff --git a/internal/statesync/block_queue.go b/internal/statesync/block_queue.go new file mode 100644 index 000000000..fd8cac278 --- /dev/null +++ b/internal/statesync/block_queue.go @@ -0,0 +1,263 @@ +package statesync + +import ( + "container/heap" + "fmt" + "sync" + "time" + + "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/types" +) + +type lightBlockResponse struct { + block *types.LightBlock + peer p2p.NodeID +} + +// a block queue is used for asynchronously fetching and verifying light blocks +type blockQueue struct { + mtx sync.Mutex + + // cursors to keep track of which heights need to be fetched and verified + fetchHeight int64 + verifyHeight int64 + + // termination conditions + stopHeight int64 + stopTime time.Time + terminal *types.LightBlock + + // track failed heights so we know what blocks to try fetch again + failed *maxIntHeap + // also count retries to know when to give up + retries int + maxRetries int + + // store inbound blocks and serve them to a verifying thread via a channel + pending map[int64]lightBlockResponse + verifyCh chan lightBlockResponse + + // waiters are workers on idle until a height is required + waiters []chan int64 + + // this channel is closed once the verification process is complete + doneCh chan struct{} +} + +func newBlockQueue( + startHeight, stopHeight int64, + stopTime time.Time, + maxRetries int, +) *blockQueue { + return &blockQueue{ + stopHeight: stopHeight, + stopTime: stopTime, + fetchHeight: startHeight, + verifyHeight: startHeight, + pending: make(map[int64]lightBlockResponse), + failed: &maxIntHeap{}, + retries: 0, + maxRetries: maxRetries, + waiters: make([]chan int64, 0), + doneCh: make(chan struct{}), + } +} + +// Add adds a block to the queue to be verified and stored +// CONTRACT: light blocks should have passed basic validation +func (q *blockQueue) add(l lightBlockResponse) { + q.mtx.Lock() + defer q.mtx.Unlock() + + // return early if the process has already finished + select { + case <-q.doneCh: + return + default: + } + + // sometimes more blocks are fetched then what is necessary. If we already + // have what we need then ignore this + if q.terminal != nil && l.block.Height < q.terminal.Height { + return + } + + // if the block that was returned is at the verify height then the verifier + // is already waiting for this block so we send it directly to them + if l.block.Height == q.verifyHeight && q.verifyCh != nil { + q.verifyCh <- l + close(q.verifyCh) + q.verifyCh = nil + } else { + // else we add it in the pending bucket + q.pending[l.block.Height] = l + } + + // Lastly, if the incoming block is past the stop time and stop height then + // we mark it as the terminal block + if l.block.Height <= q.stopHeight && l.block.Time.Before(q.stopTime) { + q.terminal = l.block + } +} + +// NextHeight returns the next height that needs to be retrieved. +// We assume that for every height allocated that the peer will eventually add +// the block or signal that it needs to be retried +func (q *blockQueue) nextHeight() <-chan int64 { + q.mtx.Lock() + defer q.mtx.Unlock() + ch := make(chan int64, 1) + // if a previous process failed then we pick up this one + if q.failed.Len() > 0 { + failedHeight := heap.Pop(q.failed) + ch <- failedHeight.(int64) + close(ch) + return ch + } + + if q.terminal == nil { + // return and decrement the fetch height + ch <- q.fetchHeight + q.fetchHeight-- + close(ch) + return ch + } + + // at this point there is no height that we know we need so we create a + // waiter to hold out for either an outgoing request to fail or a block to + // fail verification + q.waiters = append(q.waiters, ch) + return ch +} + +// Finished returns true when the block queue has has all light blocks retrieved, +// verified and stored. There is no more work left to be done +func (q *blockQueue) done() <-chan struct{} { + return q.doneCh +} + +// VerifyNext pulls the next block off the pending queue and adds it to a +// channel if it's already there or creates a waiter to add it to the +// channel once it comes in. NOTE: This is assumed to +// be a single thread as light blocks need to be sequentially verified. +func (q *blockQueue) verifyNext() <-chan lightBlockResponse { + q.mtx.Lock() + defer q.mtx.Unlock() + ch := make(chan lightBlockResponse, 1) + + select { + case <-q.doneCh: + return ch + default: + } + + if lb, ok := q.pending[q.verifyHeight]; ok { + ch <- lb + close(ch) + delete(q.pending, q.verifyHeight) + } else { + q.verifyCh = ch + } + + return ch +} + +// Retry is called when a dispatcher failed to fetch a light block or the +// fetched light block failed verification. It signals to the queue to add the +// height back to the request queue +func (q *blockQueue) retry(height int64) { + q.mtx.Lock() + defer q.mtx.Unlock() + + select { + case <-q.doneCh: + return + default: + } + + // we don't need to retry if this is below the terminal height + if q.terminal != nil && height < q.terminal.Height { + return + } + + q.retries++ + if q.retries >= q.maxRetries { + q._closeChannels() + return + } + + if len(q.waiters) > 0 { + q.waiters[0] <- height + close(q.waiters[0]) + q.waiters = q.waiters[1:] + } else { + heap.Push(q.failed, height) + } +} + +// Success is called when a light block has been successfully verified and +// processed +func (q *blockQueue) success(height int64) { + q.mtx.Lock() + defer q.mtx.Unlock() + if q.terminal != nil && q.verifyHeight == q.terminal.Height { + q._closeChannels() + } + q.verifyHeight-- +} + +func (q *blockQueue) error() error { + q.mtx.Lock() + defer q.mtx.Unlock() + if q.retries >= q.maxRetries { + return fmt.Errorf("failed to backfill blocks following reverse sync. Max retries exceeded (%d). "+ + "Target height: %d, height reached: %d", q.maxRetries, q.stopHeight, q.verifyHeight) + } + return nil +} + +// close the queue and respective channels +func (q *blockQueue) close() { + q.mtx.Lock() + defer q.mtx.Unlock() + q._closeChannels() +} + +// CONTRACT: must have a write lock. Use close instead +func (q *blockQueue) _closeChannels() { + close(q.doneCh) + + // wait for the channel to be drained + select { + case <-q.doneCh: + return + default: + } + + for _, ch := range q.waiters { + close(ch) + } + if q.verifyCh != nil { + close(q.verifyCh) + } +} + +// A max-heap of ints. +type maxIntHeap []int64 + +func (h maxIntHeap) Len() int { return len(h) } +func (h maxIntHeap) Less(i, j int) bool { return h[i] < h[j] } +func (h maxIntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *maxIntHeap) Push(x interface{}) { + *h = append(*h, x.(int64)) +} + +func (h *maxIntHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/internal/statesync/block_queue_test.go b/internal/statesync/block_queue_test.go new file mode 100644 index 000000000..581def941 --- /dev/null +++ b/internal/statesync/block_queue_test.go @@ -0,0 +1,241 @@ +package statesync + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/internal/test/factory" +) + +var ( + startHeight int64 = 200 + stopHeight int64 = 100 + stopTime = time.Date(2019, 1, 1, 1, 0, 0, 0, time.UTC) + endTime = stopTime.Add(-1 * time.Second) + numWorkers = 1 +) + +func TestBlockQueueBasic(t *testing.T) { + peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899") + require.NoError(t, err) + + queue := newBlockQueue(startHeight, stopHeight, stopTime, 1) + wg := &sync.WaitGroup{} + + // asynchronously fetch blocks and add it to the queue + for i := 0; i <= numWorkers; i++ { + wg.Add(1) + go func() { + for { + select { + case height := <-queue.nextHeight(): + queue.add(mockLBResp(t, peerID, height, endTime)) + case <-queue.done(): + wg.Done() + return + } + } + }() + } + + trackingHeight := startHeight + wg.Add(1) + +loop: + for { + select { + case <-queue.done(): + wg.Done() + break loop + + case resp := <-queue.verifyNext(): + // assert that the queue serializes the blocks + require.Equal(t, resp.block.Height, trackingHeight) + trackingHeight-- + queue.success(resp.block.Height) + } + + } + + wg.Wait() + assert.Less(t, trackingHeight, stopHeight) +} + +// Test with spurious failures and retries +func TestBlockQueueWithFailures(t *testing.T) { + peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899") + require.NoError(t, err) + + queue := newBlockQueue(startHeight, stopHeight, stopTime, 200) + wg := &sync.WaitGroup{} + + failureRate := 4 + for i := 0; i <= numWorkers; i++ { + wg.Add(1) + go func() { + for { + select { + case height := <-queue.nextHeight(): + if rand.Intn(failureRate) == 0 { + queue.retry(height) + } else { + queue.add(mockLBResp(t, peerID, height, endTime)) + } + case <-queue.done(): + wg.Done() + return + } + } + }() + } + + trackingHeight := startHeight + for { + select { + case resp := <-queue.verifyNext(): + // assert that the queue serializes the blocks + assert.Equal(t, resp.block.Height, trackingHeight) + if rand.Intn(failureRate) == 0 { + queue.retry(resp.block.Height) + } else { + trackingHeight-- + queue.success(resp.block.Height) + } + + case <-queue.done(): + wg.Wait() + assert.Less(t, trackingHeight, stopHeight) + return + } + } +} + +// Test that when all the blocks are retrieved that the queue still holds on to +// it's workers and in the event of failure can still fetch the failed block +func TestBlockQueueBlocks(t *testing.T) { + peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899") + require.NoError(t, err) + queue := newBlockQueue(startHeight, stopHeight, stopTime, 2) + expectedHeight := startHeight + retryHeight := stopHeight + 2 + +loop: + for { + select { + case height := <-queue.nextHeight(): + require.Equal(t, height, expectedHeight) + require.GreaterOrEqual(t, height, stopHeight) + expectedHeight-- + queue.add(mockLBResp(t, peerID, height, endTime)) + case <-time.After(1 * time.Second): + if expectedHeight >= stopHeight { + t.Fatalf("expected next height %d", expectedHeight) + } + break loop + } + } + + // close any waiter channels that the previous worker left hanging + for _, ch := range queue.waiters { + close(ch) + } + queue.waiters = make([]chan int64, 0) + + wg := &sync.WaitGroup{} + wg.Add(1) + // so far so good. The worker is waiting. Now we fail a previous + // block and check that the worker fetches them + go func(t *testing.T) { + defer wg.Done() + select { + case height := <-queue.nextHeight(): + require.Equal(t, retryHeight, height) + case <-time.After(1 * time.Second): + require.Fail(t, "queue didn't ask worker to fetch failed height") + } + }(t) + queue.retry(retryHeight) + wg.Wait() + +} + +func TestBlockQueueAcceptsNoMoreBlocks(t *testing.T) { + peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899") + require.NoError(t, err) + queue := newBlockQueue(startHeight, stopHeight, stopTime, 1) + defer queue.close() + +loop: + for { + select { + case height := <-queue.nextHeight(): + require.GreaterOrEqual(t, height, stopHeight) + queue.add(mockLBResp(t, peerID, height, endTime)) + case <-time.After(1 * time.Second): + break loop + } + } + + require.Len(t, queue.pending, int(startHeight-stopHeight)+1) + + queue.add(mockLBResp(t, peerID, stopHeight-1, endTime)) + require.Len(t, queue.pending, int(startHeight-stopHeight)+1) +} + +// Test a scenario where more blocks are needed then just the stopheight because +// we haven't found a block with a small enough time. +func TestBlockQueueStopTime(t *testing.T) { + peerID, err := p2p.NewNodeID("0011223344556677889900112233445566778899") + require.NoError(t, err) + + queue := newBlockQueue(startHeight, stopHeight, stopTime, 1) + wg := &sync.WaitGroup{} + + baseTime := stopTime.Add(-50 * time.Second) + + // asynchronously fetch blocks and add it to the queue + for i := 0; i <= numWorkers; i++ { + wg.Add(1) + go func() { + for { + select { + case height := <-queue.nextHeight(): + blockTime := baseTime.Add(time.Duration(height) * time.Second) + queue.add(mockLBResp(t, peerID, height, blockTime)) + case <-queue.done(): + wg.Done() + return + } + } + }() + } + + trackingHeight := startHeight + for { + select { + case resp := <-queue.verifyNext(): + // assert that the queue serializes the blocks + assert.Equal(t, resp.block.Height, trackingHeight) + trackingHeight-- + queue.success(resp.block.Height) + + case <-queue.done(): + wg.Wait() + assert.Less(t, trackingHeight, stopHeight-50) + return + } + } +} + +func mockLBResp(t *testing.T, peer p2p.NodeID, height int64, time time.Time) lightBlockResponse { + return lightBlockResponse{ + block: mockLB(t, height, time, factory.MakeBlockID()), + peer: peer, + } +} diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go new file mode 100644 index 000000000..e58f6d468 --- /dev/null +++ b/internal/statesync/dispatcher.go @@ -0,0 +1,322 @@ +package statesync + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/tendermint/tendermint/internal/p2p" + "github.com/tendermint/tendermint/light/provider" + ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" + proto "github.com/tendermint/tendermint/proto/tendermint/types" + "github.com/tendermint/tendermint/types" +) + +var ( + errNoConnectedPeers = errors.New("no available peers to dispatch request to") + errUnsolicitedResponse = errors.New("unsolicited light block response") + errNoResponse = errors.New("peer failed to respond within timeout") + errPeerAlreadyBusy = errors.New("peer is already processing a request") + errDisconnected = errors.New("dispatcher has been disconnected") +) + +// dispatcher keeps a list of peers and allows concurrent requests for light +// blocks. NOTE: It is not the responsibility of the dispatcher to verify the +// light blocks. +type dispatcher struct { + availablePeers *peerlist + requestCh chan<- p2p.Envelope + timeout time.Duration + + mtx sync.Mutex + calls map[p2p.NodeID]chan *types.LightBlock + running bool +} + +func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispatcher { + return &dispatcher{ + availablePeers: newPeerList(), + timeout: timeout, + requestCh: requestCh, + calls: make(map[p2p.NodeID]chan *types.LightBlock), + running: true, + } +} + +func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.LightBlock, p2p.NodeID, error) { + d.mtx.Lock() + outgoingCalls := len(d.calls) + d.mtx.Unlock() + + // check to see that the dispatcher is connected to at least one peer + if d.availablePeers.Len() == 0 && outgoingCalls == 0 { + return nil, "", errNoConnectedPeers + } + + // fetch the next peer id in the list and request a light block from that + // peer + peer := d.availablePeers.Pop() + lb, err := d.lightBlock(ctx, height, peer) + return lb, peer, err +} + +func (d *dispatcher) Providers(chainID string, timeout time.Duration) []provider.Provider { + d.mtx.Lock() + defer d.mtx.Unlock() + + providers := make([]provider.Provider, d.availablePeers.Len()) + peers := d.availablePeers.Peers() + for index, peer := range peers { + providers[index] = &blockProvider{ + peer: peer, + dispatcher: d, + chainID: chainID, + timeout: timeout, + } + } + return providers +} + +func (d *dispatcher) stop() { + d.mtx.Lock() + defer d.mtx.Unlock() + d.running = false + for peer, call := range d.calls { + close(call) + delete(d.calls, peer) + } +} + +func (d *dispatcher) start() { + d.mtx.Lock() + defer d.mtx.Unlock() + d.running = true +} + +func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer p2p.NodeID) (*types.LightBlock, error) { + // dispatch the request to the peer + callCh, err := d.dispatch(peer, height) + if err != nil { + return nil, err + } + + // wait for a response, cancel or timeout + select { + case resp := <-callCh: + return resp, nil + + case <-ctx.Done(): + d.release(peer) + return nil, nil + + case <-time.After(d.timeout): + d.release(peer) + return nil, errNoResponse + } +} + +// respond allows the underlying process which receives requests on the +// requestCh to respond with the respective light block +func (d *dispatcher) respond(lb *proto.LightBlock, peer p2p.NodeID) error { + d.mtx.Lock() + defer d.mtx.Unlock() + + // check that the response came from a request + answerCh, ok := d.calls[peer] + if !ok { + // this can also happen if the response came in after the timeout + return errUnsolicitedResponse + } + // release the peer after returning the response + defer d.availablePeers.Append(peer) + defer close(answerCh) + defer delete(d.calls, peer) + + if lb == nil { + answerCh <- nil + return nil + } + + block, err := types.LightBlockFromProto(lb) + if err != nil { + fmt.Println("error with converting light block") + return err + } + + answerCh <- block + return nil +} + +func (d *dispatcher) addPeer(peer p2p.NodeID) { + d.availablePeers.Append(peer) +} + +func (d *dispatcher) removePeer(peer p2p.NodeID) { + d.mtx.Lock() + defer d.mtx.Unlock() + if _, ok := d.calls[peer]; ok { + delete(d.calls, peer) + } else { + d.availablePeers.Remove(peer) + } +} + +// dispatch takes a peer and allocates it a channel so long as it's not already +// busy and the receiving channel is still running. It then dispatches the message +func (d *dispatcher) dispatch(peer p2p.NodeID, height int64) (chan *types.LightBlock, error) { + d.mtx.Lock() + defer d.mtx.Unlock() + ch := make(chan *types.LightBlock, 1) + + // check if the dispatcher is running or not + if !d.running { + close(ch) + return ch, errDisconnected + } + + // this should happen only if we add the same peer twice (somehow) + if _, ok := d.calls[peer]; ok { + close(ch) + return ch, errPeerAlreadyBusy + } + d.calls[peer] = ch + + // send request + d.requestCh <- p2p.Envelope{ + To: peer, + Message: &ssproto.LightBlockRequest{ + Height: uint64(height), + }, + } + return ch, nil +} + +// release appends the peer back to the list and deletes the allocated call so +// that a new call can be made to that peer +func (d *dispatcher) release(peer p2p.NodeID) { + d.mtx.Lock() + defer d.mtx.Unlock() + if call, ok := d.calls[peer]; ok { + close(call) + delete(d.calls, peer) + } + d.availablePeers.Append(peer) +} + +//---------------------------------------------------------------- + +// blockProvider is a p2p based light provider which uses a dispatcher connected +// to the state sync reactor to serve light blocks to the light client +// +// TODO: This should probably be moved over to the light package but as we're +// not yet officially supporting p2p light clients we'll leave this here for now. +type blockProvider struct { + peer p2p.NodeID + chainID string + timeout time.Duration + dispatcher *dispatcher +} + +func (p *blockProvider) LightBlock(ctx context.Context, height int64) (*types.LightBlock, error) { + // FIXME: The provider doesn't know if the dispatcher is still connected to + // that peer. If the connection is dropped for whatever reason the + // dispatcher needs to be able to relay this back to the provider so it can + // return ErrConnectionClosed instead of ErrNoResponse + ctx, cancel := context.WithTimeout(ctx, p.timeout) + defer cancel() + lb, _ := p.dispatcher.lightBlock(ctx, height, p.peer) + if lb == nil { + return nil, provider.ErrNoResponse + } + + if err := lb.ValidateBasic(p.chainID); err != nil { + return nil, provider.ErrBadLightBlock{Reason: err} + } + + return lb, nil +} + +// ReportEvidence should allow for the light client to report any light client +// attacks. This is a no op as there currently isn't a way to wire this up to +// the evidence reactor (we should endeavor to do this in the future but for now +// it's not critical for backwards verification) +func (p *blockProvider) ReportEvidence(ctx context.Context, ev types.Evidence) error { + return nil +} + +// String implements stringer interface +func (p *blockProvider) String() string { return string(p.peer) } + +//---------------------------------------------------------------- + +// peerList is a rolling list of peers. This is used to distribute the load of +// retrieving blocks over all the peers the reactor is connected to +type peerlist struct { + mtx sync.Mutex + peers []p2p.NodeID + waiting []chan p2p.NodeID +} + +func newPeerList() *peerlist { + return &peerlist{ + peers: make([]p2p.NodeID, 0), + waiting: make([]chan p2p.NodeID, 0), + } +} + +func (l *peerlist) Len() int { + l.mtx.Lock() + defer l.mtx.Unlock() + return len(l.peers) +} + +func (l *peerlist) Pop() p2p.NodeID { + l.mtx.Lock() + if len(l.peers) == 0 { + // if we don't have any peers in the list we block until a peer is + // appended + wait := make(chan p2p.NodeID, 1) + l.waiting = append(l.waiting, wait) + // unlock whilst waiting so that the list can be appended to + l.mtx.Unlock() + peer := <-wait + return peer + } + + peer := l.peers[0] + l.peers = l.peers[1:] + l.mtx.Unlock() + return peer +} + +func (l *peerlist) Append(peer p2p.NodeID) { + l.mtx.Lock() + defer l.mtx.Unlock() + if len(l.waiting) > 0 { + wait := l.waiting[0] + l.waiting = l.waiting[1:] + wait <- peer + close(wait) + } else { + l.peers = append(l.peers, peer) + } +} + +func (l *peerlist) Remove(peer p2p.NodeID) { + l.mtx.Lock() + defer l.mtx.Unlock() + for i, p := range l.peers { + if p == peer { + l.peers = append(l.peers[:i], l.peers[i+1:]...) + return + } + } +} + +func (l *peerlist) Peers() []p2p.NodeID { + l.mtx.Lock() + defer l.mtx.Unlock() + return l.peers +} diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go new file mode 100644 index 000000000..5026abd19 --- /dev/null +++ b/internal/statesync/dispatcher_test.go @@ -0,0 +1,179 @@ +package statesync + +import ( + "context" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/internal/p2p" + ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" +) + +func TestDispatcherBasic(t *testing.T) { + + ch := make(chan p2p.Envelope, 100) + closeCh := make(chan struct{}) + defer close(closeCh) + + d := newDispatcher(ch, 1*time.Second) + + go handleRequests(t, d, ch, closeCh) + + peers := createPeerSet(5) + for _, peer := range peers { + d.addPeer(peer) + } + + wg := sync.WaitGroup{} + + // make a bunch of async requests and require that the correct responses are + // given + for i := 1; i < 10; i++ { + wg.Add(1) + go func(height int64) { + defer wg.Done() + lb, peer, err := d.LightBlock(context.Background(), height) + require.NoError(t, err) + require.NotNil(t, lb) + require.Equal(t, lb.Height, height) + require.Contains(t, peers, peer) + }(int64(i)) + } + wg.Wait() +} + +func TestDispatcherProviders(t *testing.T) { + + ch := make(chan p2p.Envelope, 100) + chainID := "state-sync-test" + closeCh := make(chan struct{}) + defer close(closeCh) + + d := newDispatcher(ch, 1*time.Second) + + go handleRequests(t, d, ch, closeCh) + + peers := createPeerSet(5) + for _, peer := range peers { + d.addPeer(peer) + } + + providers := d.Providers(chainID, 5*time.Second) + require.Len(t, providers, 5) + for i, p := range providers { + bp, ok := p.(*blockProvider) + require.True(t, ok) + assert.Equal(t, bp.String(), string(peers[i])) + lb, err := p.LightBlock(context.Background(), 10) + assert.Error(t, err) + assert.Nil(t, lb) + } +} + +func TestPeerListBasic(t *testing.T) { + peerList := newPeerList() + assert.Zero(t, peerList.Len()) + numPeers := 10 + peerSet := createPeerSet(numPeers) + + for _, peer := range peerSet { + peerList.Append(peer) + } + + for idx, peer := range peerList.Peers() { + assert.Equal(t, peer, peerSet[idx]) + } + + assert.Equal(t, numPeers, peerList.Len()) + + half := numPeers / 2 + for i := 0; i < half; i++ { + assert.Equal(t, peerSet[i], peerList.Pop()) + } + assert.Equal(t, half, peerList.Len()) + + peerList.Remove(p2p.NodeID("lp")) + assert.Equal(t, half, peerList.Len()) + + peerList.Remove(peerSet[half]) + half++ + assert.Equal(t, peerSet[half], peerList.Pop()) + +} + +func TestPeerListConcurrent(t *testing.T) { + peerList := newPeerList() + numPeers := 10 + + wg := sync.WaitGroup{} + // we run a set of goroutines requesting the next peer in the list. As the + // peer list hasn't been populated each these go routines should block + for i := 0; i < numPeers/2; i++ { + go func() { + _ = peerList.Pop() + wg.Done() + }() + } + + // now we add the peers to the list, this should allow the previously + // blocked go routines to unblock + for _, peer := range createPeerSet(numPeers) { + wg.Add(1) + peerList.Append(peer) + } + + // we request the second half of the peer set + for i := 0; i < numPeers/2; i++ { + go func() { + _ = peerList.Pop() + wg.Done() + }() + } + + // we use a context with cancel and a separate go routine to wait for all + // the other goroutines to close. + ctx, cancel := context.WithCancel(context.Background()) + go func() { wg.Wait(); cancel() }() + + select { + case <-time.After(time.Second): + // not all of the blocked go routines waiting on peers have closed after + // one second. This likely means the list got blocked. + t.Failed() + case <-ctx.Done(): + // there should be no peers remaining + require.Equal(t, 0, peerList.Len()) + } +} + +// handleRequests is a helper function usually run in a separate go routine to +// imitate the expected responses of the reactor wired to the dispatcher +func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh chan struct{}) { + t.Helper() + for { + select { + case request := <-ch: + height := request.Message.(*ssproto.LightBlockRequest).Height + peer := request.To + resp := mockLBResp(t, peer, int64(height), time.Now()) + block, _ := resp.block.ToProto() + require.NoError(t, d.respond(block, resp.peer)) + case <-closeCh: + return + } + } +} + +func createPeerSet(num int) []p2p.NodeID { + peers := make([]p2p.NodeID, num) + for i := 0; i < num; i++ { + peers[i], _ = p2p.NewNodeID(strings.Repeat(fmt.Sprintf("%d", i), 2*p2p.NodeIDByteLength)) + } + return peers +} diff --git a/internal/statesync/mocks/state_provider.go b/internal/statesync/mocks/state_provider.go index 951514060..4f367380d 100644 --- a/internal/statesync/mocks/state_provider.go +++ b/internal/statesync/mocks/state_provider.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.5.1. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index fa38ca293..703cbeedb 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -1,9 +1,11 @@ package statesync import ( + "bytes" "context" "errors" "fmt" + "reflect" "sort" "time" @@ -15,6 +17,7 @@ import ( ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" ) @@ -49,6 +52,17 @@ var ( SendQueueCapacity: 4, RecvMessageCapacity: chunkMsgSize, + MaxSendBytes: 400, + }, + }, + LightBlockChannel: { + MsgType: new(ssproto.Message), + Descriptor: &p2p.ChannelDescriptor{ + ID: byte(LightBlockChannel), + Priority: 1, + SendQueueCapacity: 10, + RecvMessageCapacity: lightBlockMsgSize, + MaxSendBytes: 400, }, }, @@ -62,6 +76,9 @@ const ( // ChunkChannel exchanges chunk contents ChunkChannel = p2p.ChannelID(0x61) + // LightBlockChannel exchanges light blocks + LightBlockChannel = p2p.ChannelID(0x62) + // recentSnapshots is the number of recent snapshots to send and receive per peer. recentSnapshots = 10 @@ -70,6 +87,17 @@ const ( // chunkMsgSize is the maximum size of a chunkResponseMessage chunkMsgSize = int(16e6) + + // lightBlockMsgSize is the maximum size of a lightBlockResponseMessage + lightBlockMsgSize = int(1e7) + + // lightBlockResponseTimeout is how long the dispatcher waits for a peer to + // return a light block + lightBlockResponseTimeout = 10 * time.Second + + // maxLightBlockRequestRetries is the amount of retries acceptable before + // the backfill process aborts + maxLightBlockRequestRetries = 20 ) // Reactor handles state sync, both restoring snapshots for the local node and @@ -77,14 +105,20 @@ const ( type Reactor struct { service.BaseService + stateStore sm.Store + blockStore *store.BlockStore + conn proxy.AppConnSnapshot connQuery proxy.AppConnQuery tempDir string snapshotCh *p2p.Channel chunkCh *p2p.Channel + blockCh *p2p.Channel peerUpdates *p2p.PeerUpdates closeCh chan struct{} + dispatcher *dispatcher + // This will only be set when a state sync is in progress. It is used to feed // received snapshots and chunks into the sync. mtx tmsync.RWMutex @@ -99,8 +133,10 @@ func NewReactor( logger log.Logger, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, - snapshotCh, chunkCh *p2p.Channel, + snapshotCh, chunkCh, blockCh *p2p.Channel, peerUpdates *p2p.PeerUpdates, + stateStore sm.Store, + blockStore *store.BlockStore, tempDir string, ) *Reactor { r := &Reactor{ @@ -108,9 +144,13 @@ func NewReactor( connQuery: connQuery, snapshotCh: snapshotCh, chunkCh: chunkCh, + blockCh: blockCh, peerUpdates: peerUpdates, closeCh: make(chan struct{}), tempDir: tempDir, + dispatcher: newDispatcher(blockCh.Out, lightBlockResponseTimeout), + stateStore: stateStore, + blockStore: blockStore, } r.BaseService = *service.NewBaseService(logger, "StateSync", r) @@ -134,14 +174,21 @@ func (r *Reactor) OnStart() error { // have to deal with bounding workers or pools. go r.processChunkCh() + go r.processBlockCh() + go r.processPeerUpdates() + r.dispatcher.start() + return nil } // OnStop stops the reactor by signaling to all spawned goroutines to exit and // blocking until they all exit. func (r *Reactor) OnStop() { + // tell the dispatcher to stop sending any more requests + r.dispatcher.stop() + // Close closeCh to signal to all spawned goroutines to gracefully exit. All // p2p Channels should execute Close(). close(r.closeCh) @@ -151,9 +198,205 @@ func (r *Reactor) OnStop() { // panics will occur. <-r.snapshotCh.Done() <-r.chunkCh.Done() + <-r.blockCh.Done() <-r.peerUpdates.Done() } +// Sync runs a state sync, fetching snapshots and providing chunks to the +// application. It also saves tendermint state and runs a backfill process to +// retrieve the necessary amount of headers, commits and validators sets to be +// able to process evidence and participate in consensus. +func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) error { + r.mtx.Lock() + if r.syncer != nil { + r.mtx.Unlock() + return errors.New("a state sync is already in progress") + } + + r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir) + r.mtx.Unlock() + + hook := func() { + // request snapshots from all currently connected peers + r.Logger.Debug("requesting snapshots from known peers") + r.snapshotCh.Out <- p2p.Envelope{ + Broadcast: true, + Message: &ssproto.SnapshotsRequest{}, + } + } + + hook() + + state, commit, err := r.syncer.SyncAny(discoveryTime, hook) + if err != nil { + return err + } + + r.mtx.Lock() + r.syncer = nil + r.mtx.Unlock() + + err = r.stateStore.Bootstrap(state) + if err != nil { + return fmt.Errorf("failed to bootstrap node with new state: %w", err) + } + + err = r.blockStore.SaveSeenCommit(state.LastBlockHeight, commit) + if err != nil { + return fmt.Errorf("failed to store last seen commit: %w", err) + } + + // start backfill process to retrieve the necessary headers, commits and + // validator sets + return r.backfill(state) +} + +// Backfill sequentially fetches, verifies and stores light blocks in reverse +// order. It does not stop verifying blocks until reaching a block with a height +// and time that is less or equal to the stopHeight and stopTime. The +// trustedBlockID should be of the header at startHeight. +func (r *Reactor) Backfill( + ctx context.Context, + chainID string, + startHeight, stopHeight int64, + trustedBlockID types.BlockID, + stopTime time.Time, +) error { + r.Logger.Info("starting backfill process...", "startHeight", startHeight, + "stopHeight", stopHeight, "trustedBlockID", trustedBlockID) + + var ( + lastValidatorSet *types.ValidatorSet + lastChangeHeight int64 = startHeight + ) + + queue := newBlockQueue(startHeight, stopHeight, stopTime, maxLightBlockRequestRetries) + + // fetch light blocks across four workers. The aim with deploying concurrent + // workers is to equate the network messaging time with the verification + // time. Ideally we want the verification process to never have to be + // waiting on blocks. If it takes 4s to retrieve a block and 1s to verify + // it, then steady state involves four workers. + for i := 0; i < 4; i++ { + go func() { + for { + select { + case height := <-queue.nextHeight(): + r.Logger.Debug("fetching next block", "height", height) + lb, peer, err := r.dispatcher.LightBlock(ctx, height) + if err != nil { + // we don't punish the peer as it might just not have the block + // at that height + r.Logger.Info("error with fetching light block", + "height", height, "err", err) + queue.retry(height) + continue + } + if lb == nil { + r.Logger.Info("peer didn't have block, fetching from another peer", "height", height) + queue.retry(height) + continue + } + + if lb.Height != height { + r.Logger.Info("peer provided wrong height, retrying...", "height", height) + queue.retry(height) + continue + } + + // run a validate basic. This checks the validator set and commit + // hashes line up + err = lb.ValidateBasic(chainID) + if err != nil { + r.Logger.Info("fetched light block failed validate basic, removing peer...", "err", err) + queue.retry(height) + r.blockCh.Error <- p2p.PeerError{ + NodeID: peer, + Err: fmt.Errorf("received invalid light block: %w", err), + } + continue + } + + // add block to queue to be verified + queue.add(lightBlockResponse{ + block: lb, + peer: peer, + }) + r.Logger.Debug("added light block to processing queue", "height", height) + + case <-queue.done(): + return + } + } + }() + } + + // verify all light blocks + for { + select { + case <-r.closeCh: + queue.close() + return nil + case <-ctx.Done(): + queue.close() + return nil + case resp := <-queue.verifyNext(): + // validate the header hash. We take the last block id of the + // previous header (i.e. one height above) as the trusted hash which + // we equate to. ValidatorsHash and CommitHash have already been + // checked in the `ValidateBasic` + if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) { + r.Logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID", + "trustedHash", w, "receivedHash", g, "height", resp.block.Height) + r.blockCh.Error <- p2p.PeerError{ + NodeID: resp.peer, + Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g), + } + queue.retry(resp.block.Height) + continue + } + + // save the signed headers + err := r.blockStore.SaveSignedHeader(resp.block.SignedHeader, trustedBlockID) + if err != nil { + return err + } + + // check if there has been a change in the validator set + if lastValidatorSet != nil && !bytes.Equal(resp.block.Header.ValidatorsHash, resp.block.Header.NextValidatorsHash) { + // save all the heights that the last validator set was the same + err = r.stateStore.SaveValidatorSets(resp.block.Height+1, lastChangeHeight, lastValidatorSet) + if err != nil { + return err + } + + // update the lastChangeHeight + lastChangeHeight = resp.block.Height + } + + trustedBlockID = resp.block.LastBlockID + queue.success(resp.block.Height) + r.Logger.Info("verified and stored light block", "height", resp.block.Height) + + lastValidatorSet = resp.block.ValidatorSet + + case <-queue.done(): + if err := queue.error(); err != nil { + return err + } + + // save the final batch of validators + return r.stateStore.SaveValidatorSets(queue.terminal.Height, lastChangeHeight, lastValidatorSet) + } + } +} + +// Dispatcher exposes the dispatcher so that a state provider can use it for +// light client verification +func (r *Reactor) Dispatcher() *dispatcher { //nolint:golint + return r.dispatcher +} + // handleSnapshotMessage handles envelopes sent from peers on the // SnapshotChannel. It returns an error only if the Envelope.Message is unknown // for this channel. This should never be called outside of handleMessage. @@ -311,6 +554,44 @@ func (r *Reactor) handleChunkMessage(envelope p2p.Envelope) error { return nil } +func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error { + switch msg := envelope.Message.(type) { + case *ssproto.LightBlockRequest: + r.Logger.Info("received light block request", "height", msg.Height) + lb, err := r.fetchLightBlock(msg.Height) + if err != nil { + r.Logger.Error("failed to retrieve light block", "err", err, "height", msg.Height) + return err + } + + lbproto, err := lb.ToProto() + if err != nil { + r.Logger.Error("marshaling light block to proto", "err", err) + return nil + } + + // NOTE: If we don't have the light block we will send a nil light block + // back to the requested node, indicating that we don't have it. + r.blockCh.Out <- p2p.Envelope{ + To: envelope.From, + Message: &ssproto.LightBlockResponse{ + LightBlock: lbproto, + }, + } + + case *ssproto.LightBlockResponse: + if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil { + r.Logger.Error("error processing light block response", "err", err) + return err + } + + default: + return fmt.Errorf("received unknown message: %T", msg) + } + + return nil +} + // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. @@ -321,7 +602,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err } }() - r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) + r.Logger.Debug("received message", "message", reflect.TypeOf(envelope.Message), "peer", envelope.From) switch chID { case SnapshotChannel: @@ -330,6 +611,9 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err case ChunkChannel: err = r.handleChunkMessage(envelope) + case LightBlockChannel: + err = r.handleLightBlockMessage(envelope) + default: err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) } @@ -338,52 +622,44 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err } // processSnapshotCh initiates a blocking process where we listen for and handle -// envelopes on the SnapshotChannel. Any error encountered during message -// execution will result in a PeerError being sent on the SnapshotChannel. When -// the reactor is stopped, we will catch the signal and close the p2p Channel -// gracefully. +// envelopes on the SnapshotChannel. func (r *Reactor) processSnapshotCh() { - defer r.snapshotCh.Close() - - for { - select { - case envelope := <-r.snapshotCh.In: - if err := r.handleMessage(r.snapshotCh.ID, envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.snapshotCh.ID, "envelope", envelope, "err", err) - r.snapshotCh.Error <- p2p.PeerError{ - NodeID: envelope.From, - Err: err, - } - } - - case <-r.closeCh: - r.Logger.Debug("stopped listening on snapshot channel; closing...") - return - } - } + r.processCh(r.snapshotCh, "snapshot") } // processChunkCh initiates a blocking process where we listen for and handle -// envelopes on the ChunkChannel. Any error encountered during message -// execution will result in a PeerError being sent on the ChunkChannel. When -// the reactor is stopped, we will catch the signal and close the p2p Channel -// gracefully. +// envelopes on the ChunkChannel. func (r *Reactor) processChunkCh() { - defer r.chunkCh.Close() + r.processCh(r.chunkCh, "chunk") +} + +// processBlockCh initiates a blocking process where we listen for and handle +// envelopes on the LightBlockChannel. +func (r *Reactor) processBlockCh() { + r.processCh(r.blockCh, "light block") +} + +// processCh routes state sync messages to their respective handlers. Any error +// encountered during message execution will result in a PeerError being sent on +// the respective channel. When the reactor is stopped, we will catch the signal +// and close the p2p Channel gracefully. +func (r *Reactor) processCh(ch *p2p.Channel, chName string) { + defer ch.Close() for { select { - case envelope := <-r.chunkCh.In: - if err := r.handleMessage(r.chunkCh.ID, envelope); err != nil { - r.Logger.Error("failed to process message", "ch_id", r.chunkCh.ID, "envelope", envelope, "err", err) - r.chunkCh.Error <- p2p.PeerError{ + case envelope := <-ch.In: + if err := r.handleMessage(ch.ID, envelope); err != nil { + r.Logger.Error(fmt.Sprintf("failed to process %s message", chName), + "ch_id", ch.ID, "envelope", envelope, "err", err) + ch.Error <- p2p.PeerError{ NodeID: envelope.From, Err: err, } } case <-r.closeCh: - r.Logger.Debug("stopped listening on chunk channel; closing...") + r.Logger.Debug(fmt.Sprintf("stopped listening on %s channel; closing...", chName)) return } } @@ -397,14 +673,18 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) { r.mtx.RLock() defer r.mtx.RUnlock() - if r.syncer != nil { - switch peerUpdate.Status { - case p2p.PeerStatusUp: + switch peerUpdate.Status { + case p2p.PeerStatusUp: + if r.syncer != nil { r.syncer.AddPeer(peerUpdate.NodeID) + } + r.dispatcher.addPeer(peerUpdate.NodeID) - case p2p.PeerStatusDown: + case p2p.PeerStatusDown: + if r.syncer != nil { r.syncer.RemovePeer(peerUpdate.NodeID) } + r.dispatcher.removePeer(peerUpdate.NodeID) } } @@ -465,34 +745,56 @@ func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) { return snapshots, nil } -// Sync runs a state sync, returning the new state and last commit at the snapshot height. -// The caller must store the state and commit in the state database and block store. -func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, *types.Commit, error) { - r.mtx.Lock() - if r.syncer != nil { - r.mtx.Unlock() - return sm.State{}, nil, errors.New("a state sync is already in progress") +// fetchLightBlock works out whether the node has a light block at a particular +// height and if so returns it so it can be gossiped to peers +func (r *Reactor) fetchLightBlock(height uint64) (*types.LightBlock, error) { + h := int64(height) + + blockMeta := r.blockStore.LoadBlockMeta(h) + if blockMeta == nil { + return nil, nil } - r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.snapshotCh.Out, r.chunkCh.Out, r.tempDir) - r.mtx.Unlock() - - hook := func() { - // request snapshots from all currently connected peers - r.Logger.Debug("requesting snapshots from known peers") - r.snapshotCh.Out <- p2p.Envelope{ - Broadcast: true, - Message: &ssproto.SnapshotsRequest{}, - } + commit := r.blockStore.LoadBlockCommit(h) + if commit == nil { + return nil, nil } - hook() + vals, err := r.stateStore.LoadValidators(h) + if err != nil { + return nil, err + } + if vals == nil { + return nil, nil + } - state, commit, err := r.syncer.SyncAny(discoveryTime, hook) + return &types.LightBlock{ + SignedHeader: &types.SignedHeader{ + Header: &blockMeta.Header, + Commit: commit, + }, + ValidatorSet: vals, + }, nil - r.mtx.Lock() - r.syncer = nil - r.mtx.Unlock() - - return state, commit, err +} + +// backfill is a convenience wrapper around the backfill function. It takes +// state to work out how many prior blocks need to be verified +func (r *Reactor) backfill(state sm.State) error { + params := state.ConsensusParams.Evidence + stopHeight := state.LastBlockHeight - params.MaxAgeNumBlocks + stopTime := state.LastBlockTime.Add(-params.MaxAgeDuration) + // ensure that stop height doesn't go below the initial height + if stopHeight < state.InitialHeight { + stopHeight = state.InitialHeight + // this essentially makes stop time a void criteria for termination + stopTime = state.LastBlockTime + } + return r.Backfill( + context.Background(), + state.ChainID, + state.LastBlockHeight, stopHeight, + state.LastBlockID, + stopTime, + ) } diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index df9667b29..36a8f9075 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -2,17 +2,29 @@ package statesync import ( "context" + "fmt" + "math/rand" + "sync" "testing" "time" + // "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/statesync/mocks" + "github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/light/provider" ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" proxymocks "github.com/tendermint/tendermint/proxy/mocks" + smmocks "github.com/tendermint/tendermint/state/mocks" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" ) type reactorTestSuite struct { @@ -33,7 +45,16 @@ type reactorTestSuite struct { chunkOutCh chan p2p.Envelope chunkPeerErrCh chan p2p.PeerError - peerUpdates *p2p.PeerUpdates + blockChannel *p2p.Channel + blockInCh chan p2p.Envelope + blockOutCh chan p2p.Envelope + blockPeerErrCh chan p2p.PeerError + + peerUpdateCh chan p2p.PeerUpdate + peerUpdates *p2p.PeerUpdates + + stateStore *smmocks.Store + blockStore *store.BlockStore } func setup( @@ -62,12 +83,17 @@ func setup( chunkInCh: make(chan p2p.Envelope, chBuf), chunkOutCh: make(chan p2p.Envelope, chBuf), chunkPeerErrCh: make(chan p2p.PeerError, chBuf), - peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate), int(chBuf)), + blockInCh: make(chan p2p.Envelope, chBuf), + blockOutCh: make(chan p2p.Envelope, chBuf), + blockPeerErrCh: make(chan p2p.PeerError, chBuf), conn: conn, connQuery: connQuery, stateProvider: stateProvider, } + rts.peerUpdateCh = make(chan p2p.PeerUpdate, chBuf) + rts.peerUpdates = p2p.NewPeerUpdates(rts.peerUpdateCh, int(chBuf)) + rts.snapshotChannel = p2p.NewChannel( SnapshotChannel, new(ssproto.Message), @@ -84,16 +110,33 @@ func setup( rts.chunkPeerErrCh, ) + rts.blockChannel = p2p.NewChannel( + LightBlockChannel, + new(ssproto.Message), + rts.blockInCh, + rts.blockOutCh, + rts.blockPeerErrCh, + ) + + rts.stateStore = &smmocks.Store{} + rts.blockStore = store.NewBlockStore(dbm.NewMemDB()) + rts.reactor = NewReactor( - log.NewNopLogger(), + log.TestingLogger(), conn, connQuery, rts.snapshotChannel, rts.chunkChannel, + rts.blockChannel, rts.peerUpdates, + rts.stateStore, + rts.blockStore, "", ) + // override the dispatcher with one with a shorter timeout + rts.reactor.dispatcher = newDispatcher(rts.blockChannel.Out, 1*time.Second) + rts.syncer = newSyncer( log.NewNopLogger(), conn, @@ -270,6 +313,172 @@ func TestReactor_SnapshotsRequest(t *testing.T) { } } +func TestReactor_LightBlockResponse(t *testing.T) { + rts := setup(t, nil, nil, nil, 2) + + var height int64 = 10 + h := factory.MakeRandomHeader() + h.Height = height + blockID := factory.MakeBlockIDWithHash(h.Hash()) + vals, pv := factory.RandValidatorSet(1, 10) + vote, err := factory.MakeVote(pv[0], h.ChainID, 0, h.Height, 0, 2, + blockID, factory.DefaultTestTime) + require.NoError(t, err) + + sh := &types.SignedHeader{ + Header: h, + Commit: &types.Commit{ + Height: h.Height, + BlockID: blockID, + Signatures: []types.CommitSig{ + vote.CommitSig(), + }, + }, + } + + lb := &types.LightBlock{ + SignedHeader: sh, + ValidatorSet: vals, + } + + require.NoError(t, rts.blockStore.SaveSignedHeader(sh, blockID)) + + rts.stateStore.On("LoadValidators", height).Return(vals, nil) + + rts.blockInCh <- p2p.Envelope{ + From: p2p.NodeID("aa"), + Message: &ssproto.LightBlockRequest{ + Height: 10, + }, + } + require.Empty(t, rts.blockPeerErrCh) + + select { + case response := <-rts.blockOutCh: + require.Equal(t, p2p.NodeID("aa"), response.To) + res, ok := response.Message.(*ssproto.LightBlockResponse) + require.True(t, ok) + receivedLB, err := types.LightBlockFromProto(res.LightBlock) + require.NoError(t, err) + require.Equal(t, lb, receivedLB) + case <-time.After(1 * time.Second): + t.Fatal("expected light block response") + } +} + +func TestReactor_Dispatcher(t *testing.T) { + rts := setup(t, nil, nil, nil, 2) + rts.peerUpdateCh <- p2p.PeerUpdate{ + NodeID: p2p.NodeID("aa"), + Status: p2p.PeerStatusUp, + } + rts.peerUpdateCh <- p2p.PeerUpdate{ + NodeID: p2p.NodeID("bb"), + Status: p2p.PeerStatusUp, + } + + closeCh := make(chan struct{}) + defer close(closeCh) + + chain := buildLightBlockChain(t, 1, 10, time.Now()) + go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0) + + dispatcher := rts.reactor.Dispatcher() + providers := dispatcher.Providers(factory.DefaultTestChainID, 5*time.Second) + require.Len(t, providers, 2) + + wg := sync.WaitGroup{} + + for _, p := range providers { + wg.Add(1) + go func(t *testing.T, p provider.Provider) { + defer wg.Done() + for height := 2; height < 10; height++ { + lb, err := p.LightBlock(context.Background(), int64(height)) + require.NoError(t, err) + require.NotNil(t, lb) + require.Equal(t, height, int(lb.Height)) + } + }(t, p) + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { wg.Wait(); cancel() }() + + select { + case <-time.After(time.Second): + // not all of the requests to the dispatcher were responded to + // within the timeout + t.Fail() + case <-ctx.Done(): + } +} + +func TestReactor_Backfill(t *testing.T) { + // test backfill algorithm with varying failure rates [0, 10] + failureRates := []int{0, 3, 9} + for _, failureRate := range failureRates { + failureRate := failureRate + t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) { + // t.Cleanup(leaktest.Check(t)) + rts := setup(t, nil, nil, nil, 21) + + var ( + startHeight int64 = 20 + stopHeight int64 = 10 + stopTime = time.Date(2020, 1, 1, 0, 100, 0, 0, time.UTC) + ) + + peers := []string{"a", "b", "c", "d"} + for _, peer := range peers { + rts.peerUpdateCh <- p2p.PeerUpdate{ + NodeID: p2p.NodeID(peer), + Status: p2p.PeerStatusUp, + } + } + + trackingHeight := startHeight + rts.stateStore.On("SaveValidatorSets", mock.AnythingOfType("int64"), mock.AnythingOfType("int64"), + mock.AnythingOfType("*types.ValidatorSet")).Return(func(lh, uh int64, vals *types.ValidatorSet) error { + require.Equal(t, trackingHeight, lh) + require.Equal(t, lh, uh) + require.GreaterOrEqual(t, lh, stopHeight) + trackingHeight-- + return nil + }) + + chain := buildLightBlockChain(t, stopHeight-1, startHeight+1, stopTime) + + closeCh := make(chan struct{}) + defer close(closeCh) + go handleLightBlockRequests(t, chain, rts.blockOutCh, + rts.blockInCh, closeCh, failureRate) + + err := rts.reactor.Backfill( + context.Background(), + factory.DefaultTestChainID, + startHeight, + stopHeight, + factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()), + stopTime, + ) + if failureRate > 5 { + require.Error(t, err) + } else { + require.NoError(t, err) + + for height := startHeight; height <= stopHeight; height++ { + blockMeta := rts.blockStore.LoadBlockMeta(height) + require.NotNil(t, blockMeta) + } + + require.Nil(t, rts.blockStore.LoadBlockMeta(stopHeight-1)) + require.Nil(t, rts.blockStore.LoadBlockMeta(startHeight+1)) + } + }) + } +} + // retryUntil will continue to evaluate fn and will return successfully when true // or fail when the timeout is reached. func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) { @@ -284,3 +493,87 @@ func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) { require.NoError(t, ctx.Err()) } } + +func handleLightBlockRequests(t *testing.T, + chain map[int64]*types.LightBlock, + receiving chan p2p.Envelope, + sending chan p2p.Envelope, + close chan struct{}, + failureRate int) { + requests := 0 + for { + select { + case envelope := <-receiving: + if msg, ok := envelope.Message.(*ssproto.LightBlockRequest); ok { + if requests%10 >= failureRate { + lb, err := chain[int64(msg.Height)].ToProto() + require.NoError(t, err) + sending <- p2p.Envelope{ + From: envelope.To, + Message: &ssproto.LightBlockResponse{ + LightBlock: lb, + }, + } + } else { + switch rand.Intn(3) { + case 0: // send a different block + differntLB, err := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID()).ToProto() + require.NoError(t, err) + sending <- p2p.Envelope{ + From: envelope.To, + Message: &ssproto.LightBlockResponse{ + LightBlock: differntLB, + }, + } + case 1: // send nil block i.e. pretend we don't have it + sending <- p2p.Envelope{ + From: envelope.To, + Message: &ssproto.LightBlockResponse{ + LightBlock: nil, + }, + } + case 2: // don't do anything + } + } + } + case <-close: + return + } + requests++ + } +} + +func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock { + chain := make(map[int64]*types.LightBlock, toHeight-fromHeight) + lastBlockID := factory.MakeBlockID() + blockTime := startTime.Add(-5 * time.Minute) + for height := fromHeight; height < toHeight; height++ { + chain[height] = mockLB(t, height, blockTime, lastBlockID) + lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash()) + blockTime = blockTime.Add(1 * time.Minute) + } + return chain +} + +func mockLB(t *testing.T, height int64, time time.Time, + lastBlockID types.BlockID) *types.LightBlock { + header, err := factory.MakeHeader(&types.Header{ + Height: height, + LastBlockID: lastBlockID, + Time: time, + }) + require.NoError(t, err) + vals, pv := factory.RandValidatorSet(3, 10) + header.ValidatorsHash = vals.Hash() + lastBlockID = factory.MakeBlockIDWithHash(header.Hash()) + voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, vals) + commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, pv, time) + require.NoError(t, err) + return &types.LightBlock{ + SignedHeader: &types.SignedHeader{ + Header: header, + Commit: commit, + }, + ValidatorSet: vals, + } +} diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index 99eef2bbe..c165f72dc 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -53,7 +53,7 @@ func NewLightClientStateProvider( logger log.Logger, ) (StateProvider, error) { if len(servers) < 2 { - return nil, fmt.Errorf("at least 2 RPC servers are required, got %v", len(servers)) + return nil, fmt.Errorf("at least 2 RPC servers are required, got %d", len(servers)) } providers := make([]lightprovider.Provider, 0, len(servers)) @@ -83,6 +83,41 @@ func NewLightClientStateProvider( }, nil } +// NewLightClientStateProviderFromDispatcher creates a light client state +// provider but uses a p2p connected dispatched instead of RPC endpoints +func NewLightClientStateProviderFromDispatcher( + ctx context.Context, + chainID string, + version sm.Version, + initialHeight int64, + dispatcher *dispatcher, + trustOptions light.TrustOptions, + logger log.Logger, +) (StateProvider, error) { + providers := dispatcher.Providers(chainID, 10*time.Second) + if len(providers) < 2 { + return nil, fmt.Errorf("at least 2 peers are required, got %d", len(providers)) + } + + providersMap := make(map[lightprovider.Provider]string) + for _, p := range providers { + providersMap[p] = p.(*blockProvider).String() + } + + lc, err := light.NewClient(ctx, chainID, trustOptions, providers[0], providers[1:], + lightdb.New(dbm.NewMemDB()), light.Logger(logger)) + if err != nil { + return nil, err + } + + return &lightClientStateProvider{ + lc: lc, + version: version, + initialHeight: initialHeight, + providers: providersMap, + }, nil +} + // AppHash implements StateProvider. func (s *lightClientStateProvider) AppHash(ctx context.Context, height uint64) ([]byte, error) { s.Lock() diff --git a/internal/test/factory/block.go b/internal/test/factory/block.go index 87203221f..f8772f189 100644 --- a/internal/test/factory/block.go +++ b/internal/test/factory/block.go @@ -1,6 +1,8 @@ package factory import ( + "time" + "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/types" @@ -11,6 +13,10 @@ const ( DefaultTestChainID = "test-chain" ) +var ( + DefaultTestTime = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) +) + func MakeVersion() version.Consensus { return version.Consensus{ Block: version.BlockProtocol, diff --git a/node/node.go b/node/node.go index a285b69f6..849204dba 100644 --- a/node/node.go +++ b/node/node.go @@ -333,7 +333,10 @@ func makeNode(config *cfg.Config, proxyApp.Query(), channels[statesync.SnapshotChannel], channels[statesync.ChunkChannel], + channels[statesync.LightBlockChannel], peerUpdates, + stateStore, + blockStore, config.StateSync.TempDir, ) @@ -1038,20 +1041,15 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto } go func() { - state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime) + err := ssR.Sync(stateProvider, config.DiscoveryTime) if err != nil { ssR.Logger.Error("State sync failed", "err", err) return } - err = stateStore.Bootstrap(state) + + state, err := stateStore.Load() if err != nil { - ssR.Logger.Error("Failed to bootstrap node with new state", "err", err) - return - } - err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit) - if err != nil { - ssR.Logger.Error("Failed to store last seen commit", "err", err) - return + ssR.Logger.Error("failed to load state", "err", err) } if fastSync { diff --git a/node/setup.go b/node/setup.go index 19dcf96bf..d1b8fbd41 100644 --- a/node/setup.go +++ b/node/setup.go @@ -749,6 +749,7 @@ func makeNodeInfo( byte(evidence.EvidenceChannel), byte(statesync.SnapshotChannel), byte(statesync.ChunkChannel), + byte(statesync.LightBlockChannel), }, Moniker: config.Moniker, Other: p2p.NodeInfoOther{ diff --git a/proto/tendermint/abci/types.proto b/proto/tendermint/abci/types.proto index 2bb08a714..bf088e8b1 100644 --- a/proto/tendermint/abci/types.proto +++ b/proto/tendermint/abci/types.proto @@ -213,7 +213,7 @@ message ResponseDeliverTx { message ResponseEndBlock { repeated ValidatorUpdate validator_updates = 1 [(gogoproto.nullable) = false]; tendermint.types.ConsensusParams consensus_param_updates = 2; - repeated Event events = 3 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; + repeated Event events = 3 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; } message ResponseCommit { diff --git a/proto/tendermint/p2p/pex.proto b/proto/tendermint/p2p/pex.proto index 4e630f85f..1f78c9864 100644 --- a/proto/tendermint/p2p/pex.proto +++ b/proto/tendermint/p2p/pex.proto @@ -27,7 +27,6 @@ message PexResponseV2 { repeated PexAddressV2 addresses = 1 [(gogoproto.nullable) = false]; } - message PexMessage { oneof sum { PexRequest pex_request = 1; diff --git a/proto/tendermint/statesync/message.go b/proto/tendermint/statesync/message.go index 71d0b4eb8..6f9b6ad59 100644 --- a/proto/tendermint/statesync/message.go +++ b/proto/tendermint/statesync/message.go @@ -22,6 +22,12 @@ func (m *Message) Wrap(pb proto.Message) error { case *SnapshotsResponse: m.Sum = &Message_SnapshotsResponse{SnapshotsResponse: msg} + case *LightBlockRequest: + m.Sum = &Message_LightBlockRequest{LightBlockRequest: msg} + + case *LightBlockResponse: + m.Sum = &Message_LightBlockResponse{LightBlockResponse: msg} + default: return fmt.Errorf("unknown message: %T", msg) } @@ -45,6 +51,12 @@ func (m *Message) Unwrap() (proto.Message, error) { case *Message_SnapshotsResponse: return m.GetSnapshotsResponse(), nil + case *Message_LightBlockRequest: + return m.GetLightBlockRequest(), nil + + case *Message_LightBlockResponse: + return m.GetLightBlockResponse(), nil + default: return nil, fmt.Errorf("unknown message: %T", msg) } @@ -86,6 +98,14 @@ func (m *Message) Validate() error { return errors.New("snapshot has no chunks") } + case *Message_LightBlockRequest: + if m.GetLightBlockRequest().Height == 0 { + return errors.New("height cannot be 0") + } + + // light block validation handled by the backfill process + case *Message_LightBlockResponse: + default: return fmt.Errorf("unknown message type: %T", msg) } diff --git a/proto/tendermint/statesync/types.pb.go b/proto/tendermint/statesync/types.pb.go index 8391e0ead..f5eab7a33 100644 --- a/proto/tendermint/statesync/types.pb.go +++ b/proto/tendermint/statesync/types.pb.go @@ -6,6 +6,7 @@ package statesync import ( fmt "fmt" proto "github.com/gogo/protobuf/proto" + types "github.com/tendermint/tendermint/proto/tendermint/types" io "io" math "math" math_bits "math/bits" @@ -28,6 +29,8 @@ type Message struct { // *Message_SnapshotsResponse // *Message_ChunkRequest // *Message_ChunkResponse + // *Message_LightBlockRequest + // *Message_LightBlockResponse Sum isMessage_Sum `protobuf_oneof:"sum"` } @@ -82,11 +85,19 @@ type Message_ChunkRequest struct { type Message_ChunkResponse struct { ChunkResponse *ChunkResponse `protobuf:"bytes,4,opt,name=chunk_response,json=chunkResponse,proto3,oneof" json:"chunk_response,omitempty"` } +type Message_LightBlockRequest struct { + LightBlockRequest *LightBlockRequest `protobuf:"bytes,5,opt,name=light_block_request,json=lightBlockRequest,proto3,oneof" json:"light_block_request,omitempty"` +} +type Message_LightBlockResponse struct { + LightBlockResponse *LightBlockResponse `protobuf:"bytes,6,opt,name=light_block_response,json=lightBlockResponse,proto3,oneof" json:"light_block_response,omitempty"` +} -func (*Message_SnapshotsRequest) isMessage_Sum() {} -func (*Message_SnapshotsResponse) isMessage_Sum() {} -func (*Message_ChunkRequest) isMessage_Sum() {} -func (*Message_ChunkResponse) isMessage_Sum() {} +func (*Message_SnapshotsRequest) isMessage_Sum() {} +func (*Message_SnapshotsResponse) isMessage_Sum() {} +func (*Message_ChunkRequest) isMessage_Sum() {} +func (*Message_ChunkResponse) isMessage_Sum() {} +func (*Message_LightBlockRequest) isMessage_Sum() {} +func (*Message_LightBlockResponse) isMessage_Sum() {} func (m *Message) GetSum() isMessage_Sum { if m != nil { @@ -123,6 +134,20 @@ func (m *Message) GetChunkResponse() *ChunkResponse { return nil } +func (m *Message) GetLightBlockRequest() *LightBlockRequest { + if x, ok := m.GetSum().(*Message_LightBlockRequest); ok { + return x.LightBlockRequest + } + return nil +} + +func (m *Message) GetLightBlockResponse() *LightBlockResponse { + if x, ok := m.GetSum().(*Message_LightBlockResponse); ok { + return x.LightBlockResponse + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*Message) XXX_OneofWrappers() []interface{} { return []interface{}{ @@ -130,6 +155,8 @@ func (*Message) XXX_OneofWrappers() []interface{} { (*Message_SnapshotsResponse)(nil), (*Message_ChunkRequest)(nil), (*Message_ChunkResponse)(nil), + (*Message_LightBlockRequest)(nil), + (*Message_LightBlockResponse)(nil), } } @@ -381,43 +408,139 @@ func (m *ChunkResponse) GetMissing() bool { return false } +type LightBlockRequest struct { + Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` +} + +func (m *LightBlockRequest) Reset() { *m = LightBlockRequest{} } +func (m *LightBlockRequest) String() string { return proto.CompactTextString(m) } +func (*LightBlockRequest) ProtoMessage() {} +func (*LightBlockRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_a1c2869546ca7914, []int{5} +} +func (m *LightBlockRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LightBlockRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LightBlockRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LightBlockRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LightBlockRequest.Merge(m, src) +} +func (m *LightBlockRequest) XXX_Size() int { + return m.Size() +} +func (m *LightBlockRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LightBlockRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LightBlockRequest proto.InternalMessageInfo + +func (m *LightBlockRequest) GetHeight() uint64 { + if m != nil { + return m.Height + } + return 0 +} + +type LightBlockResponse struct { + LightBlock *types.LightBlock `protobuf:"bytes,1,opt,name=light_block,json=lightBlock,proto3" json:"light_block,omitempty"` +} + +func (m *LightBlockResponse) Reset() { *m = LightBlockResponse{} } +func (m *LightBlockResponse) String() string { return proto.CompactTextString(m) } +func (*LightBlockResponse) ProtoMessage() {} +func (*LightBlockResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_a1c2869546ca7914, []int{6} +} +func (m *LightBlockResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LightBlockResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LightBlockResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LightBlockResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_LightBlockResponse.Merge(m, src) +} +func (m *LightBlockResponse) XXX_Size() int { + return m.Size() +} +func (m *LightBlockResponse) XXX_DiscardUnknown() { + xxx_messageInfo_LightBlockResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_LightBlockResponse proto.InternalMessageInfo + +func (m *LightBlockResponse) GetLightBlock() *types.LightBlock { + if m != nil { + return m.LightBlock + } + return nil +} + func init() { proto.RegisterType((*Message)(nil), "tendermint.statesync.Message") proto.RegisterType((*SnapshotsRequest)(nil), "tendermint.statesync.SnapshotsRequest") proto.RegisterType((*SnapshotsResponse)(nil), "tendermint.statesync.SnapshotsResponse") proto.RegisterType((*ChunkRequest)(nil), "tendermint.statesync.ChunkRequest") proto.RegisterType((*ChunkResponse)(nil), "tendermint.statesync.ChunkResponse") + proto.RegisterType((*LightBlockRequest)(nil), "tendermint.statesync.LightBlockRequest") + proto.RegisterType((*LightBlockResponse)(nil), "tendermint.statesync.LightBlockResponse") } func init() { proto.RegisterFile("tendermint/statesync/types.proto", fileDescriptor_a1c2869546ca7914) } var fileDescriptor_a1c2869546ca7914 = []byte{ - // 393 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x53, 0xcd, 0x6a, 0xdb, 0x40, - 0x18, 0x94, 0xfc, 0xcf, 0x57, 0xab, 0xd8, 0x8b, 0x29, 0xa2, 0x07, 0x61, 0x54, 0x68, 0x7b, 0x92, - 0xa0, 0x3d, 0xf6, 0xe6, 0x5e, 0x5c, 0x68, 0x2f, 0xdb, 0x18, 0x42, 0x2e, 0x61, 0x2d, 0x6f, 0x24, - 0x11, 0xb4, 0x52, 0xf4, 0xad, 0x20, 0x7e, 0x80, 0x9c, 0x72, 0xc9, 0x63, 0xe5, 0xe8, 0x63, 0xc8, - 0x29, 0xd8, 0x2f, 0x12, 0xb4, 0x92, 0x65, 0xc5, 0x31, 0x09, 0x81, 0xdc, 0x76, 0xc6, 0xe3, 0xd1, - 0xcc, 0xc0, 0x07, 0x63, 0xc9, 0xc5, 0x82, 0xa7, 0x51, 0x28, 0xa4, 0x8b, 0x92, 0x49, 0x8e, 0x4b, - 0xe1, 0xb9, 0x72, 0x99, 0x70, 0x74, 0x92, 0x34, 0x96, 0x31, 0x19, 0xed, 0x14, 0x4e, 0xa5, 0xb0, - 0xef, 0x1b, 0xd0, 0xfd, 0xc7, 0x11, 0x99, 0xcf, 0xc9, 0x0c, 0x86, 0x28, 0x58, 0x82, 0x41, 0x2c, - 0xf1, 0x34, 0xe5, 0x17, 0x19, 0x47, 0x69, 0xea, 0x63, 0xfd, 0xfb, 0x87, 0x1f, 0x5f, 0x9d, 0x43, - 0xff, 0x76, 0xfe, 0x6f, 0xe5, 0xb4, 0x50, 0x4f, 0x35, 0x3a, 0xc0, 0x3d, 0x8e, 0x1c, 0x03, 0xa9, - 0xdb, 0x62, 0x12, 0x0b, 0xe4, 0x66, 0x43, 0xf9, 0x7e, 0x7b, 0xd5, 0xb7, 0x90, 0x4f, 0x35, 0x3a, - 0xc4, 0x7d, 0x92, 0xfc, 0x01, 0xc3, 0x0b, 0x32, 0x71, 0x5e, 0x85, 0x6d, 0x2a, 0x53, 0xfb, 0xb0, - 0xe9, 0xef, 0x5c, 0xba, 0x0b, 0xda, 0xf7, 0x6a, 0x98, 0xfc, 0x85, 0x8f, 0x5b, 0xab, 0x32, 0x60, - 0x4b, 0x79, 0x7d, 0x79, 0xd1, 0xab, 0x0a, 0x67, 0x78, 0x75, 0x62, 0xd2, 0x86, 0x26, 0x66, 0x91, - 0x4d, 0x60, 0xb0, 0xbf, 0x90, 0x7d, 0xad, 0xc3, 0xf0, 0x59, 0x3d, 0xf2, 0x09, 0x3a, 0x01, 0x0f, - 0xfd, 0xa0, 0xd8, 0xbb, 0x45, 0x4b, 0x94, 0xf3, 0x67, 0x71, 0x1a, 0x31, 0xa9, 0xf6, 0x32, 0x68, - 0x89, 0x72, 0x5e, 0x7d, 0x11, 0x55, 0x65, 0x83, 0x96, 0x88, 0x10, 0x68, 0x05, 0x0c, 0x03, 0x15, - 0xbe, 0x4f, 0xd5, 0x9b, 0x7c, 0x86, 0x5e, 0xc4, 0x25, 0x5b, 0x30, 0xc9, 0xcc, 0xb6, 0xe2, 0x2b, - 0x6c, 0x1f, 0x41, 0xbf, 0x3e, 0xcb, 0x9b, 0x73, 0x8c, 0xa0, 0x1d, 0x8a, 0x05, 0xbf, 0x2c, 0x63, - 0x14, 0xc0, 0xbe, 0xd2, 0xc1, 0x78, 0xb2, 0xd0, 0xfb, 0xf8, 0xe6, 0xac, 0xea, 0x59, 0xd6, 0x2b, - 0x00, 0x31, 0xa1, 0x1b, 0x85, 0x88, 0xa1, 0xf0, 0x55, 0xbd, 0x1e, 0xdd, 0xc2, 0xc9, 0xec, 0x76, - 0x6d, 0xe9, 0xab, 0xb5, 0xa5, 0x3f, 0xac, 0x2d, 0xfd, 0x66, 0x63, 0x69, 0xab, 0x8d, 0xa5, 0xdd, - 0x6d, 0x2c, 0xed, 0xe4, 0x97, 0x1f, 0xca, 0x20, 0x9b, 0x3b, 0x5e, 0x1c, 0xb9, 0xb5, 0xcb, 0xa9, - 0x3d, 0xd5, 0xd1, 0xb8, 0x87, 0xae, 0x6a, 0xde, 0x51, 0xbf, 0xfd, 0x7c, 0x0c, 0x00, 0x00, 0xff, - 0xff, 0xcc, 0x16, 0xc2, 0x8b, 0x74, 0x03, 0x00, 0x00, + // 485 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x51, 0x6b, 0xd3, 0x50, + 0x14, 0x4e, 0x5c, 0xdb, 0x8d, 0xb3, 0x46, 0x96, 0x63, 0x91, 0x32, 0x46, 0x18, 0x11, 0x74, 0x20, + 0xa4, 0xa0, 0x8f, 0xe2, 0x4b, 0x7d, 0x99, 0x30, 0x5f, 0xee, 0x1c, 0xa8, 0x08, 0x23, 0x4d, 0xaf, + 0x4d, 0xb0, 0x49, 0x6a, 0xcf, 0x2d, 0xb8, 0x1f, 0xe0, 0x93, 0x2f, 0x82, 0x7f, 0xca, 0xc7, 0x3d, + 0xfa, 0x28, 0xed, 0x1f, 0x91, 0x9c, 0xdc, 0x26, 0x77, 0x6d, 0x5d, 0x11, 0xf6, 0x96, 0xef, 0xeb, + 0x77, 0x3e, 0xbe, 0x73, 0xcf, 0xe9, 0x81, 0x63, 0x25, 0xb3, 0xa1, 0x9c, 0xa6, 0x49, 0xa6, 0x7a, + 0xa4, 0x42, 0x25, 0xe9, 0x2a, 0x8b, 0x7a, 0xea, 0x6a, 0x22, 0x29, 0x98, 0x4c, 0x73, 0x95, 0x63, + 0xa7, 0x56, 0x04, 0x95, 0xe2, 0xf0, 0xc8, 0xa8, 0x63, 0xb5, 0x59, 0xe3, 0xff, 0x6c, 0xc0, 0xee, + 0x1b, 0x49, 0x14, 0x8e, 0x24, 0x5e, 0x80, 0x4b, 0x59, 0x38, 0xa1, 0x38, 0x57, 0x74, 0x39, 0x95, + 0x5f, 0x66, 0x92, 0x54, 0xd7, 0x3e, 0xb6, 0x4f, 0xf6, 0x9f, 0x3d, 0x0e, 0x36, 0x79, 0x07, 0xe7, + 0x4b, 0xb9, 0x28, 0xd5, 0xa7, 0x96, 0x38, 0xa0, 0x15, 0x0e, 0xdf, 0x01, 0x9a, 0xb6, 0x34, 0xc9, + 0x33, 0x92, 0xdd, 0x7b, 0xec, 0xfb, 0x64, 0xab, 0x6f, 0x29, 0x3f, 0xb5, 0x84, 0x4b, 0xab, 0x24, + 0xbe, 0x06, 0x27, 0x8a, 0x67, 0xd9, 0xe7, 0x2a, 0xec, 0x0e, 0x9b, 0xfa, 0x9b, 0x4d, 0x5f, 0x15, + 0xd2, 0x3a, 0x68, 0x3b, 0x32, 0x30, 0x9e, 0xc1, 0xfd, 0xa5, 0x95, 0x0e, 0xd8, 0x60, 0xaf, 0x47, + 0xb7, 0x7a, 0x55, 0xe1, 0x9c, 0xc8, 0x24, 0xf0, 0x3d, 0x3c, 0x18, 0x27, 0xa3, 0x58, 0x5d, 0x0e, + 0xc6, 0x79, 0x54, 0xc7, 0x6b, 0xde, 0xd6, 0xf3, 0x59, 0x51, 0xd0, 0x2f, 0xf4, 0x75, 0x46, 0x77, + 0xbc, 0x4a, 0xe2, 0x47, 0xe8, 0xdc, 0xb4, 0xd6, 0x71, 0x5b, 0xec, 0x7d, 0xb2, 0xdd, 0xbb, 0xca, + 0x8c, 0xe3, 0x35, 0xb6, 0xdf, 0x84, 0x1d, 0x9a, 0xa5, 0x3e, 0xc2, 0xc1, 0xea, 0x68, 0xfd, 0xef, + 0x36, 0xb8, 0x6b, 0x73, 0xc1, 0x87, 0xd0, 0x8a, 0x65, 0xe1, 0xc3, 0x8b, 0xd2, 0x10, 0x1a, 0x15, + 0xfc, 0xa7, 0x7c, 0x9a, 0x86, 0x8a, 0x07, 0xed, 0x08, 0x8d, 0x0a, 0x9e, 0x9f, 0x8a, 0x78, 0x56, + 0x8e, 0xd0, 0x08, 0x11, 0x1a, 0x71, 0x48, 0x31, 0xbf, 0x7a, 0x5b, 0xf0, 0x37, 0x1e, 0xc2, 0x5e, + 0x2a, 0x55, 0x38, 0x0c, 0x55, 0xc8, 0x4f, 0xd7, 0x16, 0x15, 0xf6, 0xdf, 0x42, 0xdb, 0x9c, 0xe7, + 0x7f, 0xe7, 0xe8, 0x40, 0x33, 0xc9, 0x86, 0xf2, 0xab, 0x8e, 0x51, 0x02, 0xff, 0x9b, 0x0d, 0xce, + 0x8d, 0xd1, 0xde, 0x8d, 0x6f, 0xc1, 0x72, 0x9f, 0xba, 0xbd, 0x12, 0x60, 0x17, 0x76, 0xd3, 0x84, + 0x28, 0xc9, 0x46, 0xdc, 0xde, 0x9e, 0x58, 0x42, 0xff, 0x29, 0xb8, 0x6b, 0xeb, 0xf0, 0xaf, 0x28, + 0xfe, 0x39, 0xe0, 0xfa, 0x7c, 0xf1, 0x25, 0xec, 0x1b, 0x7b, 0xa2, 0xff, 0xc6, 0x47, 0xe6, 0x7a, + 0x94, 0x67, 0xc0, 0x28, 0x85, 0x7a, 0x21, 0xfa, 0x17, 0xbf, 0xe6, 0x9e, 0x7d, 0x3d, 0xf7, 0xec, + 0x3f, 0x73, 0xcf, 0xfe, 0xb1, 0xf0, 0xac, 0xeb, 0x85, 0x67, 0xfd, 0x5e, 0x78, 0xd6, 0x87, 0x17, + 0xa3, 0x44, 0xc5, 0xb3, 0x41, 0x10, 0xe5, 0x69, 0xcf, 0x3c, 0x2d, 0xf5, 0x27, 0x5f, 0x96, 0xde, + 0xa6, 0x73, 0x35, 0x68, 0xf1, 0x6f, 0xcf, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xc1, 0x45, 0x35, + 0xee, 0xcd, 0x04, 0x00, 0x00, } func (m *Message) Marshal() (dAtA []byte, err error) { @@ -536,6 +659,48 @@ func (m *Message_ChunkResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *Message_LightBlockRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message_LightBlockRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.LightBlockRequest != nil { + { + size, err := m.LightBlockRequest.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } + return len(dAtA) - i, nil +} +func (m *Message_LightBlockResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message_LightBlockResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.LightBlockResponse != nil { + { + size, err := m.LightBlockResponse.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x32 + } + return len(dAtA) - i, nil +} func (m *SnapshotsRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -704,6 +869,69 @@ func (m *ChunkResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *LightBlockRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LightBlockRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *LightBlockRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Height != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.Height)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *LightBlockResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LightBlockResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *LightBlockResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.LightBlock != nil { + { + size, err := m.LightBlock.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { offset -= sovTypes(v) base := offset @@ -775,6 +1003,30 @@ func (m *Message_ChunkResponse) Size() (n int) { } return n } +func (m *Message_LightBlockRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.LightBlockRequest != nil { + l = m.LightBlockRequest.Size() + n += 1 + l + sovTypes(uint64(l)) + } + return n +} +func (m *Message_LightBlockResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.LightBlockResponse != nil { + l = m.LightBlockResponse.Size() + n += 1 + l + sovTypes(uint64(l)) + } + return n +} func (m *SnapshotsRequest) Size() (n int) { if m == nil { return 0 @@ -853,6 +1105,31 @@ func (m *ChunkResponse) Size() (n int) { return n } +func (m *LightBlockRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Height != 0 { + n += 1 + sovTypes(uint64(m.Height)) + } + return n +} + +func (m *LightBlockResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.LightBlock != nil { + l = m.LightBlock.Size() + n += 1 + l + sovTypes(uint64(l)) + } + return n +} + func sovTypes(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -1028,6 +1305,76 @@ func (m *Message) Unmarshal(dAtA []byte) error { } m.Sum = &Message_ChunkResponse{v} iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LightBlockRequest", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &LightBlockRequest{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Sum = &Message_LightBlockRequest{v} + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LightBlockResponse", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &LightBlockResponse{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Sum = &Message_LightBlockResponse{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTypes(dAtA[iNdEx:]) @@ -1542,6 +1889,161 @@ func (m *ChunkResponse) Unmarshal(dAtA []byte) error { } return nil } +func (m *LightBlockRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LightBlockRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LightBlockRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType) + } + m.Height = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Height |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LightBlockResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LightBlockResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LightBlockResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LightBlock", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.LightBlock == nil { + m.LightBlock = &types.LightBlock{} + } + if err := m.LightBlock.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipTypes(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/proto/tendermint/statesync/types.proto b/proto/tendermint/statesync/types.proto index 8d4a714c1..a4dd8e693 100644 --- a/proto/tendermint/statesync/types.proto +++ b/proto/tendermint/statesync/types.proto @@ -1,14 +1,18 @@ syntax = "proto3"; package tendermint.statesync; +import "tendermint/types/types.proto"; + option go_package = "github.com/tendermint/tendermint/proto/tendermint/statesync"; message Message { oneof sum { - SnapshotsRequest snapshots_request = 1; - SnapshotsResponse snapshots_response = 2; - ChunkRequest chunk_request = 3; - ChunkResponse chunk_response = 4; + SnapshotsRequest snapshots_request = 1; + SnapshotsResponse snapshots_response = 2; + ChunkRequest chunk_request = 3; + ChunkResponse chunk_response = 4; + LightBlockRequest light_block_request = 5; + LightBlockResponse light_block_response = 6; } } @@ -35,3 +39,11 @@ message ChunkResponse { bytes chunk = 4; bool missing = 5; } + +message LightBlockRequest { + uint64 height = 1; +} + +message LightBlockResponse { + tendermint.types.LightBlock light_block = 1; +} \ No newline at end of file diff --git a/proxy/mocks/app_conn_consensus.go b/proxy/mocks/app_conn_consensus.go index a90dd0c6b..3bf787dbc 100644 --- a/proxy/mocks/app_conn_consensus.go +++ b/proxy/mocks/app_conn_consensus.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.5.1. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/proxy/mocks/app_conn_mempool.go b/proxy/mocks/app_conn_mempool.go index 7d16dadf0..02b8bea8c 100644 --- a/proxy/mocks/app_conn_mempool.go +++ b/proxy/mocks/app_conn_mempool.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.5.1. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/proxy/mocks/app_conn_query.go b/proxy/mocks/app_conn_query.go index 85ac57ccc..6af88ad7c 100644 --- a/proxy/mocks/app_conn_query.go +++ b/proxy/mocks/app_conn_query.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.5.1. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/proxy/mocks/app_conn_snapshot.go b/proxy/mocks/app_conn_snapshot.go index 9ba75860a..6964a8425 100644 --- a/proxy/mocks/app_conn_snapshot.go +++ b/proxy/mocks/app_conn_snapshot.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.5.1. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 5b3d75769..f36a4e3de 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -98,8 +98,8 @@ func (env *Environment) Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes. block := env.BlockStore.LoadBlock(height) blockMeta := env.BlockStore.LoadBlockMeta(height) - if blockMeta == nil { - return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil + if blockMeta == nil || block == nil { + return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: &types.Block{}}, nil } return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil } diff --git a/state/mocks/evidence_pool.go b/state/mocks/evidence_pool.go index 0b6ebc97d..9cfc7b40b 100644 --- a/state/mocks/evidence_pool.go +++ b/state/mocks/evidence_pool.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.5.1. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks diff --git a/state/mocks/store.go b/state/mocks/store.go index d1e3a3746..bf70adc86 100644 --- a/state/mocks/store.go +++ b/state/mocks/store.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.5.1. DO NOT EDIT. +// Code generated by mockery v0.0.0-dev. DO NOT EDIT. package mocks @@ -201,3 +201,17 @@ func (_m *Store) SaveABCIResponses(_a0 int64, _a1 *tendermintstate.ABCIResponses return r0 } + +// SaveValidatorSets provides a mock function with given fields: _a0, _a1, _a2 +func (_m *Store) SaveValidatorSets(_a0 int64, _a1 int64, _a2 *types.ValidatorSet) error { + ret := _m.Called(_a0, _a1, _a2) + + var r0 error + if rf, ok := ret.Get(0).(func(int64, int64, *types.ValidatorSet) error); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/state/store.go b/state/store.go index a488d0722..15fccbbab 100644 --- a/state/store.go +++ b/state/store.go @@ -93,6 +93,8 @@ type Store interface { Save(State) error // SaveABCIResponses saves ABCIResponses for a given height SaveABCIResponses(int64, *tmstate.ABCIResponses) error + // SaveValidatorSet saves the validator set at a given height + SaveValidatorSets(int64, int64, *types.ValidatorSet) error // Bootstrap is used for bootstrapping state when not starting from a initial height. Bootstrap(State) error // PruneStates takes the height from which to prune up to (exclusive) @@ -502,6 +504,24 @@ func (store dbStore) saveABCIResponses(height int64, abciResponses *tmstate.ABCI return store.db.SetSync(abciResponsesKey(height), bz) } +// SaveValidatorSets is used to save the validator set over multiple heights. +// It is exposed so that a backfill operation during state sync can populate +// the store with the necessary amount of validator sets to verify any evidence +// it may encounter. +func (store dbStore) SaveValidatorSets(lowerHeight, upperHeight int64, vals *types.ValidatorSet) error { + batch := store.db.NewBatch() + defer batch.Close() + + // batch together all the validator sets from lowerHeight to upperHeight + for height := lowerHeight; height <= upperHeight; height++ { + if err := store.saveValidatorsInfo(height, lowerHeight, vals, batch); err != nil { + return err + } + } + + return batch.WriteSync() +} + //----------------------------------------------------------------------------- // LoadValidators loads the ValidatorSet for a given height. @@ -606,12 +626,7 @@ func (store dbStore) saveValidatorsInfo( return err } - err = batch.Set(validatorsKey(height), bz) - if err != nil { - return err - } - - return nil + return batch.Set(validatorsKey(height), bz) } //----------------------------------------------------------------------------- diff --git a/store/store.go b/store/store.go index 8f67fd0c9..1396ca777 100644 --- a/store/store.go +++ b/store/store.go @@ -519,6 +519,48 @@ func (bs *BlockStore) SaveSeenCommit(height int64, seenCommit *types.Commit) err return bs.db.Set(seenCommitKey(height), seenCommitBytes) } +func (bs *BlockStore) SaveSignedHeader(sh *types.SignedHeader, blockID types.BlockID) error { + // first check that the block store doesn't already have the block + bz, err := bs.db.Get(blockMetaKey(sh.Height)) + if err != nil { + return err + } + if bz != nil { + return fmt.Errorf("block at height %d already saved", sh.Height) + } + + // FIXME: saving signed headers although necessary for proving evidence, + // doesn't have complete parity with block meta's thus block size and num + // txs are filled with negative numbers. We should aim to find a solution to + // this. + blockMeta := &types.BlockMeta{ + BlockID: blockID, + BlockSize: -1, + Header: *sh.Header, + NumTxs: -1, + } + + batch := bs.db.NewBatch() + + pbm := blockMeta.ToProto() + metaBytes := mustEncode(pbm) + if err := batch.Set(blockMetaKey(sh.Height), metaBytes); err != nil { + return fmt.Errorf("unable to save block meta: %w", err) + } + + pbc := sh.Commit.ToProto() + blockCommitBytes := mustEncode(pbc) + if err := batch.Set(blockCommitKey(sh.Height), blockCommitBytes); err != nil { + return fmt.Errorf("unable to save commit: %w", err) + } + + if err := batch.WriteSync(); err != nil { + return err + } + + return batch.Close() +} + //---------------------------------- KEY ENCODING ----------------------------------------- // key prefixes diff --git a/test/e2e/networks/simple.toml b/test/e2e/networks/simple.toml index 05cda1819..f96d48011 100644 --- a/test/e2e/networks/simple.toml +++ b/test/e2e/networks/simple.toml @@ -1,5 +1,4 @@ [node.validator01] [node.validator02] [node.validator03] -[node.validator04] - +[node.validator04] \ No newline at end of file diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 3fb5543a8..2cb626bf6 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -49,7 +49,7 @@ const ( PerturbationRestart Perturbation = "restart" EvidenceAgeHeight int64 = 5 - EvidenceAgeTime time.Duration = 10 * time.Second + EvidenceAgeTime time.Duration = 500 * time.Millisecond ) // Testnet represents a single testnet. diff --git a/test/e2e/runner/load.go b/test/e2e/runner/load.go index adeb9c93b..573e46540 100644 --- a/test/e2e/runner/load.go +++ b/test/e2e/runner/load.go @@ -81,7 +81,7 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) { select { case chTx <- tx: - time.Sleep(time.Duration(100/multiplier) * time.Millisecond) + time.Sleep(time.Second / time.Duration(multiplier)) case <-ctx.Done(): close(chTx) diff --git a/test/e2e/tests/block_test.go b/test/e2e/tests/block_test.go index b3f4e9139..21aeeda99 100644 --- a/test/e2e/tests/block_test.go +++ b/test/e2e/tests/block_test.go @@ -32,6 +32,11 @@ func TestBlock_Header(t *testing.T) { if block.Header.Height < first { continue } + // the first blocks after state sync come from the backfill process + // and are therefore not complete + if node.StateSync && block.Header.Height <= first+e2e.EvidenceAgeHeight+1 { + continue + } if block.Header.Height > last { break } @@ -63,10 +68,10 @@ func TestBlock_Range(t *testing.T) { last := status.SyncInfo.LatestBlockHeight switch { + // if the node state synced we ignore any assertions because it's hard to know how far back + // the node ran reverse sync for case node.StateSync: - assert.Greater(t, first, node.Testnet.InitialHeight, - "state synced nodes should not contain network's initial height") - + break case node.RetainBlocks > 0 && int64(node.RetainBlocks) < (last-node.Testnet.InitialHeight+1): // Delta handles race conditions in reading first/last heights. assert.InDelta(t, node.RetainBlocks, last-first+1, 1, @@ -78,12 +83,16 @@ func TestBlock_Range(t *testing.T) { } for h := first; h <= last; h++ { + if node.StateSync && h <= first+e2e.EvidenceAgeHeight+1 { + continue + } resp, err := client.Block(ctx, &(h)) if err != nil && node.RetainBlocks > 0 && h == first { // Ignore errors in first block if node is pruning blocks due to race conditions. continue } require.NoError(t, err) + require.NotNil(t, resp.Block) assert.Equal(t, h, resp.Block.Height) } diff --git a/types/block_meta_test.go b/types/block_meta_test.go index 1e29a132a..a1a382ffa 100644 --- a/types/block_meta_test.go +++ b/types/block_meta_test.go @@ -10,7 +10,7 @@ import ( ) func TestBlockMeta_ToProto(t *testing.T) { - h := makeRandHeader() + h := MakeRandHeader() bi := BlockID{Hash: h.Hash(), PartSetHeader: PartSetHeader{Total: 123, Hash: tmrand.Bytes(tmhash.Size)}} bm := &BlockMeta{ @@ -47,7 +47,7 @@ func TestBlockMeta_ToProto(t *testing.T) { } func TestBlockMeta_ValidateBasic(t *testing.T) { - h := makeRandHeader() + h := MakeRandHeader() bi := BlockID{Hash: h.Hash(), PartSetHeader: PartSetHeader{Total: 123, Hash: tmrand.Bytes(tmhash.Size)}} bi2 := BlockID{Hash: tmrand.Bytes(tmhash.Size), PartSetHeader: PartSetHeader{Total: 123, Hash: tmrand.Bytes(tmhash.Size)}} diff --git a/types/block_test.go b/types/block_test.go index 21b251901..8685de6c7 100644 --- a/types/block_test.go +++ b/types/block_test.go @@ -749,7 +749,8 @@ func TestEvidenceDataProtoBuf(t *testing.T) { } } -func makeRandHeader() Header { +// exposed for testing +func MakeRandHeader() Header { chainID := "test" t := time.Now() height := mrand.Int63() @@ -778,7 +779,7 @@ func makeRandHeader() Header { } func TestHeaderProto(t *testing.T) { - h1 := makeRandHeader() + h1 := MakeRandHeader() tc := []struct { msg string h1 *Header @@ -830,7 +831,7 @@ func TestBlockIDProtoBuf(t *testing.T) { func TestSignedHeaderProtoBuf(t *testing.T) { commit := randCommit(time.Now()) - h := makeRandHeader() + h := MakeRandHeader() sh := SignedHeader{Header: &h, Commit: commit} diff --git a/types/light_test.go b/types/light_test.go index abf4374d4..94b2c4b4f 100644 --- a/types/light_test.go +++ b/types/light_test.go @@ -12,7 +12,7 @@ import ( ) func TestLightBlockValidateBasic(t *testing.T) { - header := makeRandHeader() + header := MakeRandHeader() commit := randCommit(time.Now()) vals, _ := randValidatorPrivValSet(5, 1) header.Height = commit.Height @@ -57,7 +57,7 @@ func TestLightBlockValidateBasic(t *testing.T) { } func TestLightBlockProtobuf(t *testing.T) { - header := makeRandHeader() + header := MakeRandHeader() commit := randCommit(time.Now()) vals, _ := randValidatorPrivValSet(5, 1) header.Height = commit.Height