From d9134063e7f556876bdbbcf56d822b3d7c5cee07 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Mon, 24 May 2021 09:48:27 -0400 Subject: [PATCH] rpc: add chunked rpc interface (#6445) --- light/proxy/routes.go | 9 +++++++++ light/rpc/client.go | 4 ++++ node/node.go | 4 ++++ rpc/client/http/http.go | 9 +++++++++ rpc/client/interface.go | 1 + rpc/client/local/local.go | 4 ++++ rpc/client/mocks/client.go | 25 +++++++++++++++++++++++- rpc/client/rpc_test.go | 27 ++++++++++++++++++++++++++ rpc/core/env.go | 38 +++++++++++++++++++++++++++++++++++++ rpc/core/net.go | 27 ++++++++++++++++++++++++++ rpc/core/routes.go | 1 + rpc/core/types/responses.go | 10 ++++++++++ 12 files changed, 158 insertions(+), 1 deletion(-) diff --git a/light/proxy/routes.go b/light/proxy/routes.go index 10d2ba94e..229bf7dc7 100644 --- a/light/proxy/routes.go +++ b/light/proxy/routes.go @@ -23,6 +23,7 @@ func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc { "net_info": rpcserver.NewRPCFunc(makeNetInfoFunc(c), "", false), "blockchain": rpcserver.NewRPCFunc(makeBlockchainInfoFunc(c), "minHeight,maxHeight", true), "genesis": rpcserver.NewRPCFunc(makeGenesisFunc(c), "", true), + "genesis_chunked": rpcserver.NewRPCFunc(makeGenesisChunkedFunc(c), "", true), "block": rpcserver.NewRPCFunc(makeBlockFunc(c), "height", true), "block_by_hash": rpcserver.NewRPCFunc(makeBlockByHashFunc(c), "hash", true), "block_results": rpcserver.NewRPCFunc(makeBlockResultsFunc(c), "height", true), @@ -92,6 +93,14 @@ func makeGenesisFunc(c *lrpc.Client) rpcGenesisFunc { } } +type rpcGenesisChunkedFunc func(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) + +func makeGenesisChunkedFunc(c *lrpc.Client) rpcGenesisChunkedFunc { + return func(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) { + return c.GenesisChunked(ctx.Context(), chunk) + } +} + type rpcBlockFunc func(ctx *rpctypes.Context, height *int64) (*ctypes.ResultBlock, error) func makeBlockFunc(c *lrpc.Client) rpcBlockFunc { diff --git a/light/rpc/client.go b/light/rpc/client.go index 6a216ed2e..d8b1e954f 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -302,6 +302,10 @@ func (c *Client) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) { return c.next.Genesis(ctx) } +func (c *Client) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { + return c.next.GenesisChunked(ctx, id) +} + // Block calls rpcclient#Block and then verifies the result. func (c *Client) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { res, err := c.next.Block(ctx, height) diff --git a/node/node.go b/node/node.go index 08a2bc211..f2cc1e91b 100644 --- a/node/node.go +++ b/node/node.go @@ -827,6 +827,10 @@ func (n *Node) ConfigureRPC() (*rpccore.Environment, error) { } rpcCoreEnv.PubKey = pubKey } + if err := rpcCoreEnv.InitGenesisChunks(); err != nil { + return nil, err + } + return &rpcCoreEnv, nil } diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index 27f5576c8..54c56f99f 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -397,6 +397,15 @@ func (c *baseRPCClient) Genesis(ctx context.Context) (*ctypes.ResultGenesis, err return result, nil } +func (c *baseRPCClient) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { + result := new(ctypes.ResultGenesisChunk) + _, err := c.caller.Call(ctx, "genesis_chunked", map[string]interface{}{"chunk": id}, result) + if err != nil { + return nil, err + } + return result, nil +} + func (c *baseRPCClient) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { result := new(ctypes.ResultBlock) params := make(map[string]interface{}) diff --git a/rpc/client/interface.go b/rpc/client/interface.go index e31d1457f..36dc2f7d1 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -94,6 +94,7 @@ type SignClient interface { // HistoryClient provides access to data from genesis to now in large chunks. type HistoryClient interface { Genesis(context.Context) (*ctypes.ResultGenesis, error) + GenesisChunked(context.Context, uint) (*ctypes.ResultGenesisChunk, error) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) } diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 9a0c7871f..7f0f013a8 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -151,6 +151,10 @@ func (c *Local) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) { return c.env.Genesis(c.ctx) } +func (c *Local) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) { + return c.env.GenesisChunked(c.ctx, id) +} + func (c *Local) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) { return c.env.Block(c.ctx, height) } diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go index 265ba796d..f8eb7a45c 100644 --- a/rpc/client/mocks/client.go +++ b/rpc/client/mocks/client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.6.0. DO NOT EDIT. +// Code generated by mockery 2.7.4. DO NOT EDIT. package mocks @@ -436,6 +436,29 @@ func (_m *Client) Genesis(_a0 context.Context) (*coretypes.ResultGenesis, error) return r0, r1 } +// GenesisChunked provides a mock function with given fields: _a0, _a1 +func (_m *Client) GenesisChunked(_a0 context.Context, _a1 uint) (*coretypes.ResultGenesisChunk, error) { + ret := _m.Called(_a0, _a1) + + var r0 *coretypes.ResultGenesisChunk + if rf, ok := ret.Get(0).(func(context.Context, uint) *coretypes.ResultGenesisChunk); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*coretypes.ResultGenesisChunk) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uint) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Health provides a mock function with given fields: _a0 func (_m *Client) Health(_a0 context.Context) (*coretypes.ResultHealth, error) { ret := _m.Called(_a0) diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 77975e94e..54ddafd61 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -2,6 +2,7 @@ package client_test import ( "context" + "encoding/base64" "fmt" "math" "net/http" @@ -14,6 +15,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" + tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" tmmath "github.com/tendermint/tendermint/libs/math" mempl "github.com/tendermint/tendermint/mempool" @@ -193,6 +195,31 @@ func TestGenesisAndValidators(t *testing.T) { } } +func TestGenesisChunked(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for _, c := range GetClients(t, NodeSuite(t)) { + first, err := c.GenesisChunked(ctx, 0) + require.NoError(t, err) + + decoded := make([]string, 0, first.TotalChunks) + for i := 0; i < first.TotalChunks; i++ { + chunk, err := c.GenesisChunked(ctx, uint(i)) + require.NoError(t, err) + data, err := base64.StdEncoding.DecodeString(chunk.Data) + require.NoError(t, err) + decoded = append(decoded, string(data)) + + } + doc := []byte(strings.Join(decoded, "")) + + var out types.GenesisDoc + require.NoError(t, tmjson.Unmarshal(doc, &out), + "first: %+v, doc: %s", first, string(doc)) + } +} + func TestABCIQuery(t *testing.T) { for i, c := range GetClients(t, NodeSuite(t)) { // write something diff --git a/rpc/core/env.go b/rpc/core/env.go index 41dce4a50..2dccb2b41 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -1,12 +1,14 @@ package core import ( + "encoding/base64" "fmt" "time" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" + tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" @@ -25,6 +27,10 @@ const ( // SubscribeTimeout is the maximum time we wait to subscribe for an event. // must be less than the server's write timeout (see rpcserver.DefaultConfig) SubscribeTimeout = 5 * time.Second + + // genesisChunkSize is the maximum size, in bytes, of each + // chunk in the genesis structure for the chunked API + genesisChunkSize = 16 * 1024 * 1024 // 16 ) //---------------------------------------------- @@ -80,6 +86,9 @@ type Environment struct { Logger log.Logger Config cfg.RPCConfig + + // cache of chunked genesis data. + genChunks []string } //---------------------------------------------- @@ -122,6 +131,35 @@ func (env *Environment) validatePerPage(perPagePtr *int) int { return perPage } +// InitGenesisChunks configures the environment and should be called on service +// startup. +func (env *Environment) InitGenesisChunks() error { + if env.genChunks != nil { + return nil + } + + if env.GenDoc == nil { + return nil + } + + data, err := tmjson.Marshal(env.GenDoc) + if err != nil { + return err + } + + for i := 0; i < len(data); i += genesisChunkSize { + end := i + genesisChunkSize + + if end > len(data) { + end = len(data) + } + + env.genChunks = append(env.genChunks, base64.StdEncoding.EncodeToString(data[i:end])) + } + + return nil +} + func validateSkipCount(page, perPage int) int { skipCount := (page - 1) * perPage if skipCount < 0 { diff --git a/rpc/core/net.go b/rpc/core/net.go index 827dad17b..9202a6639 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -1,6 +1,7 @@ package core import ( + "errors" "fmt" "strings" @@ -92,9 +93,35 @@ func (env *Environment) UnsafeDialPeers( // Genesis returns genesis file. // More: https://docs.tendermint.com/master/rpc/#/Info/genesis func (env *Environment) Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { + if len(env.genChunks) > 1 { + return nil, errors.New("genesis response is large, please use the genesis_chunked API instead") + } + return &ctypes.ResultGenesis{Genesis: env.GenDoc}, nil } +func (env *Environment) GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) { + if env.genChunks == nil { + return nil, fmt.Errorf("service configuration error, genesis chunks are not initialized") + } + + if len(env.genChunks) == 0 { + return nil, fmt.Errorf("service configuration error, there are no chunks") + } + + id := int(chunk) + + if id > len(env.genChunks)-1 { + return nil, fmt.Errorf("there are %d chunks, %d is invalid", len(env.genChunks)-1, id) + } + + return &ctypes.ResultGenesisChunk{ + TotalChunks: len(env.genChunks), + ChunkNumber: id, + Data: env.genChunks[id], + }, nil +} + func getIDs(peers []string) ([]string, error) { ids := make([]string, 0, len(peers)) diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 5691d0b3f..1eb50fe4e 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -22,6 +22,7 @@ func (env *Environment) GetRoutes() RoutesMap { "net_info": rpc.NewRPCFunc(env.NetInfo, "", false), "blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true), "genesis": rpc.NewRPCFunc(env.Genesis, "", true), + "genesis_chunked": rpc.NewRPCFunc(env.GenesisChunked, "chunk", true), "block": rpc.NewRPCFunc(env.Block, "height", true), "block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", true), "block_results": rpc.NewRPCFunc(env.BlockResults, "height", true), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index c5bd2fef2..1319622ad 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -36,6 +36,16 @@ type ResultGenesis struct { Genesis *types.GenesisDoc `json:"genesis"` } +// ResultGenesisChunk is the output format for the chunked/paginated +// interface. These chunks are produced by converting the genesis +// document to JSON and then splitting the resulting payload into +// 16 megabyte blocks and then base64 encoding each block. +type ResultGenesisChunk struct { + ChunkNumber int `json:"chunk"` + TotalChunks int `json:"total"` + Data string `json:"data"` +} + // Single block (with meta) type ResultBlock struct { BlockID types.BlockID `json:"block_id"`