From 45f92ee9289f97e6309175a0936ba6321884ce97 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 11 Aug 2021 20:37:49 -0400 Subject: [PATCH] remove node refactor --- node/node.go | 147 ++++++++++++++++++++++++++++++++++++++-------- node/node_test.go | 4 +- node/rpc.go | 143 -------------------------------------------- node/setup.go | 68 +++++++++++++++++---- 4 files changed, 182 insertions(+), 180 deletions(-) delete mode 100644 node/rpc.go diff --git a/node/node.go b/node/node.go index 915f1bbfb..ced8af729 100644 --- a/node/node.go +++ b/node/node.go @@ -13,6 +13,7 @@ import ( _ "github.com/lib/pq" // provide the psql db driver "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/cors" abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" @@ -24,6 +25,7 @@ import ( "github.com/tendermint/tendermint/internal/statesync" "github.com/tendermint/tendermint/libs/log" tmnet "github.com/tendermint/tendermint/libs/net" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/strings" tmtime "github.com/tendermint/tendermint/libs/time" @@ -32,6 +34,8 @@ import ( tmgrpc "github.com/tendermint/tendermint/privval/grpc" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" + grpccore "github.com/tendermint/tendermint/rpc/grpc" + rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/store" @@ -569,35 +573,13 @@ func (n *nodeImpl) OnStart() error { // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" && n.config.Mode != cfg.ModeSeed { - env, err := n.ConfigureRPC() - if err != nil { - return err - } - - routes := env.GetRoutes() - if n.config.RPC.Unsafe { - routes = rpccore.CombineRoutes(routes, env.UnsafeRoutes()) - } - - listeners, err := startRPCServers(n.config.RPC, env.Logger, routes, n.eventBus) + listeners, err := n.startRPC() if err != nil { return err } n.rpcListeners = listeners } - if n.config.RPC.GRPCListenAddress != "" && n.config.Mode != cfg.ModeSeed { - env, err := n.ConfigureRPC() - if err != nil { - return err - } - listener, err := startRPCServer(n.config.RPC, env, env.Logger) - if err != nil { - return err - } - n.rpcListeners = append(n.rpcListeners, listener) - } - if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) @@ -823,6 +805,125 @@ func (n *nodeImpl) ConfigureRPC() (*rpccore.Environment, error) { return &rpcCoreEnv, nil } +func (n *nodeImpl) startRPC() ([]net.Listener, error) { + env, err := n.ConfigureRPC() + if err != nil { + return nil, err + } + + listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") + routes := env.GetRoutes() + + if n.config.RPC.Unsafe { + env.AddUnsafe(routes) + } + + config := rpcserver.DefaultConfig() + config.MaxBodyBytes = n.config.RPC.MaxBodyBytes + config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes + config.MaxOpenConnections = n.config.RPC.MaxOpenConnections + // If necessary adjust global WriteTimeout to ensure it's greater than + // TimeoutBroadcastTxCommit. + // See https://github.com/tendermint/tendermint/issues/3435 + if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { + config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second + } + + // we may expose the rpc over both a unix and tcp socket + listeners := make([]net.Listener, len(listenAddrs)) + for i, listenAddr := range listenAddrs { + mux := http.NewServeMux() + rpcLogger := n.Logger.With("module", "rpc-server") + wmLogger := rpcLogger.With("protocol", "websocket") + wm := rpcserver.NewWebsocketManager(routes, + rpcserver.OnDisconnect(func(remoteAddr string) { + err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) + if err != nil && err != tmpubsub.ErrSubscriptionNotFound { + wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) + } + }), + rpcserver.ReadLimit(config.MaxBodyBytes), + ) + wm.SetLogger(wmLogger) + mux.HandleFunc("/websocket", wm.WebsocketHandler) + rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger) + listener, err := rpcserver.Listen( + listenAddr, + config, + ) + if err != nil { + return nil, err + } + + var rootHandler http.Handler = mux + if n.config.RPC.IsCorsEnabled() { + corsMiddleware := cors.New(cors.Options{ + AllowedOrigins: n.config.RPC.CORSAllowedOrigins, + AllowedMethods: n.config.RPC.CORSAllowedMethods, + AllowedHeaders: n.config.RPC.CORSAllowedHeaders, + }) + rootHandler = corsMiddleware.Handler(mux) + } + if n.config.RPC.IsTLSEnabled() { + go func() { + if err := rpcserver.ServeTLS( + listener, + rootHandler, + n.config.RPC.CertFile(), + n.config.RPC.KeyFile(), + rpcLogger, + config, + ); err != nil { + n.Logger.Error("Error serving server with TLS", "err", err) + } + }() + } else { + go func() { + if err := rpcserver.Serve( + listener, + rootHandler, + rpcLogger, + config, + ); err != nil { + n.Logger.Error("Error serving server", "err", err) + } + }() + } + + listeners[i] = listener + } + + // we expose a simplified api over grpc for convenience to app devs + grpcListenAddr := n.config.RPC.GRPCListenAddress + if grpcListenAddr != "" { + config := rpcserver.DefaultConfig() + config.MaxBodyBytes = n.config.RPC.MaxBodyBytes + config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes + // NOTE: GRPCMaxOpenConnections is used, not MaxOpenConnections + config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections + // If necessary adjust global WriteTimeout to ensure it's greater than + // TimeoutBroadcastTxCommit. + // See https://github.com/tendermint/tendermint/issues/3435 + if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { + config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second + } + listener, err := rpcserver.Listen(grpcListenAddr, config) + if err != nil { + return nil, err + } + go func() { + if err := grpccore.StartGRPCServer(env, listener); err != nil { + n.Logger.Error("Error starting gRPC server", "err", err) + } + }() + listeners = append(listeners, listener) + + } + + return listeners, nil + +} + // startPrometheusServer starts a Prometheus HTTP server, listening for metrics // collectors on addr. func (n *nodeImpl) startPrometheusServer(addr string) *http.Server { diff --git a/node/node_test.go b/node/node_test.go index a4d25938d..16edb4210 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -81,9 +81,7 @@ func TestNodeStartStop(t *testing.T) { panic(err) } err = p.Signal(syscall.SIGABRT) - if err != nil { - t.Logf("err: %s", err) - } + fmt.Println(err) t.Fatal("timed out waiting for shutdown") } } diff --git a/node/rpc.go b/node/rpc.go deleted file mode 100644 index f3876905b..000000000 --- a/node/rpc.go +++ /dev/null @@ -1,143 +0,0 @@ -package node - -import ( - "context" - "errors" - "net" - "net/http" - "time" - - "github.com/rs/cors" - - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/log" - tmpubsub "github.com/tendermint/tendermint/libs/pubsub" - "github.com/tendermint/tendermint/libs/strings" - rpccore "github.com/tendermint/tendermint/rpc/core" - grpccore "github.com/tendermint/tendermint/rpc/grpc" - rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" - "github.com/tendermint/tendermint/types" -) - -func startRPCServer(rpcConfig *cfg.RPCConfig, env *rpccore.Environment, logger log.Logger) (net.Listener, error) { - // we expose a simplified api over grpc for convenience to app devs - listener, err := rpcserver.Listen(rpcConfig.GRPCListenAddress, rpcConfig.GRPCMaxOpenConnections) - if err != nil { - return nil, err - } - go func() { - if err := grpccore.StartGRPCServer(env, listener); err != nil { - logger.Error("Error starting gRPC server", "err", err) - } - }() - return listener, nil -} - -// nolint: lll -func startRPCServers(rpcConfig *cfg.RPCConfig, logger log.Logger, routes rpccore.RoutesMap, eventBus types.EventBusSubscriber) ([]net.Listener, error) { - config := rpcserver.DefaultConfig() - config.MaxBodyBytes = rpcConfig.MaxBodyBytes - config.MaxHeaderBytes = rpcConfig.MaxHeaderBytes - // If necessary adjust global WriteTimeout to ensure it's greater than - // TimeoutBroadcastTxCommit. - // See https://github.com/tendermint/tendermint/issues/3435 - if config.WriteTimeout <= rpcConfig.TimeoutBroadcastTxCommit { - config.WriteTimeout = rpcConfig.TimeoutBroadcastTxCommit + 1*time.Second - } - - // we may expose the rpc over both a unix and tcp socket - listeners, err := listenersFromRPCConfig(rpcConfig) - if err != nil { - return nil, err - } - for _, listener := range listeners { - mux := http.NewServeMux() - registerWebsocketHandler(rpcConfig, mux, routes, logger, eventBus) - rpcserver.RegisterRPCFuncs(mux, routes, logger) - listenerAddr := listener.Addr().String() - - var rootHandler http.Handler = mux - if rpcConfig.IsCorsEnabled() { - rootHandler = addCORSHandler(rpcConfig, mux) - } - if rpcConfig.IsTLSEnabled() { - go func() { - keyFile := rpcConfig.KeyFile() - certFile := rpcConfig.CertFile() - logger.Info("RPC HTTPS server starting", "address", listenerAddr, - "certfile", certFile, "keyfile", keyFile) - err := rpcserver.ServeTLS(listener, rootHandler, keyFile, certFile, logger, config) - if !errors.Is(err, net.ErrClosed) { - logger.Error("RPC HTTPS server stopped with error", "address", listener, "err", err) - return - } - logger.Info("RPC HTTPS server stopped", "address", listenerAddr) - }() - } else { - go func() { - logger.Info("RPC HTTPS server starting", "address", listenerAddr) - - err := rpcserver.Serve(listener, rootHandler, logger, config) - if !errors.Is(err, net.ErrClosed) { - logger.Error("RPC HTTP server stopped with error", "address", listener, "err", err) - return - } - logger.Info("RPC HTTP server stopped", "address", listenerAddr) - }() - } - } - - return listeners, nil -} - -func listenersFromRPCConfig(rpcConfig *cfg.RPCConfig) ([]net.Listener, error) { - listenAddrs := strings.SplitAndTrimEmpty(rpcConfig.ListenAddress, ",", " ") - listeners := make([]net.Listener, len(listenAddrs)) - for i, listenAddr := range listenAddrs { - listener, err := rpcserver.Listen(listenAddr, rpcConfig.MaxOpenConnections) - if err != nil { - // close any listeners opened before returning - for _, l := range listeners { - if l != nil { - l.Close() - } - } - return nil, err - } - listeners[i] = listener - } - return listeners, nil -} - -func registerWebsocketHandler(rpcConfig *cfg.RPCConfig, - mux *http.ServeMux, - routes rpccore.RoutesMap, - logger log.Logger, - eventBus types.EventBusSubscriber) { - wmLogger := logger.With("protocol", "websocket") - - websocketDisconnectFn := func(remoteAddr string) { - err := eventBus.UnsubscribeAll(context.Background(), remoteAddr) - if err != nil && err != tmpubsub.ErrSubscriptionNotFound { - wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) - } - } - - wm := rpcserver.NewWebsocketManager(routes, - rpcserver.OnDisconnect(websocketDisconnectFn), - rpcserver.ReadLimit(rpcConfig.MaxBodyBytes)) - wm.SetLogger(wmLogger) - mux.HandleFunc("/websocket", wm.WebsocketHandler) -} - -func addCORSHandler(rpcConfig *cfg.RPCConfig, h http.Handler) http.Handler { - if rpcConfig.IsCorsEnabled() { - corsMiddleware := cors.New(cors.Options{ - AllowedOrigins: rpcConfig.CORSAllowedOrigins, - AllowedMethods: rpcConfig.CORSAllowedMethods, - AllowedHeaders: rpcConfig.CORSAllowedHeaders, - }) - h = corsMiddleware.Handler(h) - } - return h -} diff --git a/node/setup.go b/node/setup.go index bbc92a4ab..af48fb382 100644 --- a/node/setup.go +++ b/node/setup.go @@ -8,6 +8,7 @@ import ( "math" "net" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port + "strings" "time" dbm "github.com/tendermint/tm-db" @@ -32,26 +33,23 @@ import ( "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" - "github.com/tendermint/tendermint/state/indexer/sink" + kv "github.com/tendermint/tendermint/state/indexer/sink/kv" + null "github.com/tendermint/tendermint/state/indexer/sink/null" + psql "github.com/tendermint/tendermint/state/indexer/sink/psql" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" ) -const ( - _blockStoreID = "blockstore" - _stateStoreID = "state" -) - func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { var blockStoreDB dbm.DB - blockStoreDB, err = dbProvider(&cfg.DBContext{ID: _blockStoreID, Config: config}) + blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config}) if err != nil { return } blockStore = store.NewBlockStore(blockStoreDB) - stateDB, err = dbProvider(&cfg.DBContext{ID: _stateStoreID, Config: config}) + stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config}) return } @@ -80,10 +78,58 @@ func createAndStartIndexerService( logger log.Logger, chainID string, ) (*indexer.Service, []indexer.EventSink, error) { - eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID) - if err != nil { - return nil, nil, err + + eventSinks := []indexer.EventSink{} + + // check for duplicated sinks + sinks := map[string]bool{} + for _, s := range config.TxIndex.Indexer { + sl := strings.ToLower(s) + if sinks[sl] { + return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") + } + + sinks[sl] = true } + +loop: + for k := range sinks { + switch k { + case string(indexer.NULL): + // When we see null in the config, the eventsinks will be reset with the + // nullEventSink. + eventSinks = []indexer.EventSink{null.NewEventSink()} + break loop + + case string(indexer.KV): + store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config}) + if err != nil { + return nil, nil, err + } + + eventSinks = append(eventSinks, kv.NewEventSink(store)) + + case string(indexer.PSQL): + conn := config.TxIndex.PsqlConn + if conn == "" { + return nil, nil, errors.New("the psql connection settings cannot be empty") + } + + es, _, err := psql.NewEventSink(conn, chainID) + if err != nil { + return nil, nil, err + } + eventSinks = append(eventSinks, es) + + default: + return nil, nil, errors.New("unsupported event sink type") + } + } + + if len(eventSinks) == 0 { + eventSinks = []indexer.EventSink{null.NewEventSink()} + } + indexerService := indexer.NewIndexerService(eventSinks, eventBus) indexerService.SetLogger(logger.With("module", "txindex"))