From b487feba4269660b18252695df9e2e79ca635724 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 21 Nov 2018 21:24:13 +0400 Subject: [PATCH] node: refactor privValidator ext client code & tests (#2895) * update ConsensusState#OnStop comment * consensus: set logger for WAL in tests * refactor privValidator client code and tests follow-up on https://github.com/tendermint/tendermint/pull/2866 --- CHANGELOG_PENDING.md | 2 + consensus/replay_test.go | 17 ++---- consensus/state.go | 3 +- consensus/wal_test.go | 29 +++++------ node/node.go | 86 ++++++++++++++---------------- node/node_test.go | 109 ++++++++++++++++----------------------- privval/ipc_server.go | 1 + 7 files changed, 105 insertions(+), 142 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 919569d44..0aa93afb8 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -2,6 +2,8 @@ ## v0.26.4 +*TBD* + Special thanks to external contributors on this release: Friendly reminder, we have a [bug bounty diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 70c4ba332..c261426c1 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -315,28 +315,21 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { config := ResetConfig("proxy_test_") walBody, err := WALWithNBlocks(NUM_BLOCKS) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) walFile := tempWALWithData(walBody) config.Consensus.SetWalFile(walFile) privVal := privval.LoadFilePV(config.PrivValidatorFile()) wal, err := NewWAL(walFile) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) wal.SetLogger(log.TestingLogger()) - if err := wal.Start(); err != nil { - t.Fatal(err) - } + err = wal.Start() + require.NoError(t, err) defer wal.Stop() chain, commits, err := makeBlockchainFromWAL(wal) - if err != nil { - t.Fatalf(err.Error()) - } + require.NoError(t, err) stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion) store.chain = chain diff --git a/consensus/state.go b/consensus/state.go index 0f7b56bc5..4b7aec2af 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -324,10 +324,11 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { go cs.receiveRoutine(maxSteps) } -// OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish. +// OnStop implements cmn.Service. func (cs *ConsensusState) OnStop() { cs.evsw.Stop() cs.timeoutTicker.Stop() + // WAL is stopped in receiveRoutine. } // Wait waits for the the main routine to return. diff --git a/consensus/wal_test.go b/consensus/wal_test.go index c45f6acee..c056f2017 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -7,13 +7,13 @@ import ( "io/ioutil" "os" "path/filepath" - // "sync" "testing" "time" "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/libs/autofile" + "github.com/tendermint/tendermint/libs/log" tmtypes "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" @@ -23,29 +23,27 @@ import ( func TestWALTruncate(t *testing.T) { walDir, err := ioutil.TempDir("", "wal") - if err != nil { - panic(fmt.Errorf("failed to create temp WAL file: %v", err)) - } + require.NoError(t, err) defer os.RemoveAll(walDir) walFile := filepath.Join(walDir, "wal") //this magic number 4K can truncate the content when RotateFile. defaultHeadSizeLimit(10M) is hard to simulate. //this magic number 1 * time.Millisecond make RotateFile check frequently. defaultGroupCheckDuration(5s) is hard to simulate. - wal, err := NewWAL(walFile, autofile.GroupHeadSizeLimit(4096), autofile.GroupCheckDuration(1*time.Millisecond)) - if err != nil { - t.Fatal(err) - } - - wal.Start() + wal, err := NewWAL(walFile, + autofile.GroupHeadSizeLimit(4096), + autofile.GroupCheckDuration(1*time.Millisecond), + ) + require.NoError(t, err) + wal.SetLogger(log.TestingLogger()) + err = wal.Start() + require.NoError(t, err) defer wal.Stop() //60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file. //at this time, RotateFile is called, truncate content exist in each file. err = WALGenerateNBlocks(wal.Group(), 60) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) time.Sleep(1 * time.Millisecond) //wait groupCheckDuration, make sure RotateFile run @@ -99,9 +97,8 @@ func TestWALSearchForEndHeight(t *testing.T) { walFile := tempWALWithData(walBody) wal, err := NewWAL(walFile) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + wal.SetLogger(log.TestingLogger()) h := int64(3) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{}) diff --git a/node/node.go b/node/node.go index a15dc2486..8e41dfd11 100644 --- a/node/node.go +++ b/node/node.go @@ -3,7 +3,6 @@ package node import ( "bytes" "context" - "errors" "fmt" "net" "net/http" @@ -11,11 +10,12 @@ import ( "strings" "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" abci "github.com/tendermint/tendermint/abci/types" bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" @@ -148,44 +148,6 @@ type Node struct { prometheusSrv *http.Server } -func createExternalPrivValidator(listenAddr string, logger log.Logger) (types.PrivValidator, error) { - protocol, address := cmn.ProtocolAndAddress(listenAddr) - - var pvsc types.PrivValidator - - switch (protocol) { - case "unix": - pvsc = privval.NewIPCVal( - logger.With("module", "privval"), - address, - ) - - case "tcp": - // TODO: persist this key so external signer - // can actually authenticate us - pvsc = privval.NewTCPVal( - logger.With("module", "privval"), - listenAddr, - ed25519.GenPrivKey(), - ) - - default: - return nil, fmt.Errorf( - "Error creating private validator: expected either tcp or unix "+ - "protocols, got %s", - protocol, - ) - } - - pvServ, _ := pvsc.(cmn.Service) - if err := pvServ.Start(); err != nil { - return nil, fmt.Errorf("Error starting private validator client: %v", err) - } - - return pvsc, nil - -} - // NewNode returns a new, ready to go, Tendermint Node. func NewNode(config *cfg.Config, privValidator types.PrivValidator, @@ -259,11 +221,12 @@ func NewNode(config *cfg.Config, } if config.PrivValidatorListenAddr != "" { - // If an address is provided, listen on the socket for a - // connection from an external signing process. - privValidator, err = createExternalPrivValidator(config.PrivValidatorListenAddr, logger) + // If an address is provided, listen on the socket for a connection from an + // external signing process. + // FIXME: we should start services inside OnStart + privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Error with private validator socket client") } } @@ -626,11 +589,8 @@ func (n *Node) OnStop() { } } - if pvsc, ok := n.privValidator.(cmn.Service); ok { - if err := pvsc.Stop(); err != nil { - n.Logger.Error("Error stopping priv validator client", "err", err) - } + pvsc.Stop() } if n.prometheusSrv != nil { @@ -884,6 +844,36 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) { db.SetSync(genesisDocKey, bytes) } +func createAndStartPrivValidatorSocketClient( + listenAddr string, + logger log.Logger, +) (types.PrivValidator, error) { + var pvsc types.PrivValidator + + protocol, address := cmn.ProtocolAndAddress(listenAddr) + switch protocol { + case "unix": + pvsc = privval.NewIPCVal(logger.With("module", "privval"), address) + case "tcp": + // TODO: persist this key so external signer + // can actually authenticate us + pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey()) + default: + return nil, fmt.Errorf( + "Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s", + protocol, + ) + } + + if pvsc, ok := pvsc.(cmn.Service); ok { + if err := pvsc.Start(); err != nil { + return nil, errors.Wrap(err, "failed to start") + } + } + + return pvsc, nil +} + // splitAndTrimEmpty slices s into all subslices separated by sep and returns a // slice of the string s with all leading and trailing Unicode code points // contained in cutset removed. If sep is empty, SplitAndTrim splits after each diff --git a/node/node_test.go b/node/node_test.go index 180f5d9c8..4d0019eac 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -3,28 +3,28 @@ package node import ( "context" "fmt" + "net" "os" "syscall" "testing" "time" - "net" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/kvstore" + "github.com/tendermint/tendermint/crypto/ed25519" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/version" - "github.com/tendermint/tendermint/crypto/ed25519" cfg "github.com/tendermint/tendermint/config" - cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/types" - tmtime "github.com/tendermint/tendermint/types/time" "github.com/tendermint/tendermint/privval" + tmtime "github.com/tendermint/tendermint/types/time" ) func TestNodeStartStop(t *testing.T) { @@ -32,17 +32,16 @@ func TestNodeStartStop(t *testing.T) { // create & start node n, err := DefaultNewNode(config, log.TestingLogger()) - assert.NoError(t, err, "expected no err on DefaultNewNode") - err1 := n.Start() - if err1 != nil { - t.Error(err1) - } + require.NoError(t, err) + err = n.Start() + require.NoError(t, err) + t.Logf("Started node %v", n.sw.NodeInfo()) // wait for the node to produce a block blockCh := make(chan interface{}) err = n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock, blockCh) - assert.NoError(t, err) + require.NoError(t, err) select { case <-blockCh: case <-time.After(10 * time.Second): @@ -94,7 +93,7 @@ func TestNodeDelayedStop(t *testing.T) { // create & start node n, err := DefaultNewNode(config, log.TestingLogger()) n.GenesisDoc().GenesisTime = now.Add(5 * time.Second) - assert.NoError(t, err) + require.NoError(t, err) n.Start() startTime := tmtime.Now() @@ -106,7 +105,7 @@ func TestNodeSetAppVersion(t *testing.T) { // create & start node n, err := DefaultNewNode(config, log.TestingLogger()) - assert.NoError(t, err, "expected no err on DefaultNewNode") + require.NoError(t, err) // default config uses the kvstore app var appVersion version.Protocol = kvstore.ProtocolVersion @@ -122,91 +121,71 @@ func TestNodeSetAppVersion(t *testing.T) { func TestNodeSetPrivValTCP(t *testing.T) { addr := "tcp://" + testFreeAddr(t) + config := cfg.ResetTestRoot("node_priv_val_tcp_test") + config.BaseConfig.PrivValidatorListenAddr = addr + rs := privval.NewRemoteSigner( log.TestingLogger(), - cmn.RandStr(12), + config.ChainID(), addr, types.NewMockPV(), ed25519.GenPrivKey(), ) privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs) - privval.RemoteSignerConnRetries(1e6)(rs) - - config := cfg.ResetTestRoot("node_priv_val_tcp_test") - config.BaseConfig.PrivValidatorListenAddr = addr - - // kick off remote signer routine, and then start TM. - go func(rs *privval.RemoteSigner) { - rs.Start() - defer rs.Stop() - time.Sleep(100 * time.Millisecond) - }(rs) + go func() { + err := rs.Start() + if err != nil { + panic(err) + } + }() + defer rs.Stop() n, err := DefaultNewNode(config, log.TestingLogger()) - - assert.NoError(t, err, "expected no err on DefaultNewNode") - + require.NoError(t, err) assert.IsType(t, &privval.TCPVal{}, n.PrivValidator()) } -func TestNodeSetPrivValTCPNoPrefix(t *testing.T) { - addr := "tcp://" + testFreeAddr(t) +// address without a protocol must result in error +func TestPrivValidatorListenAddrNoProtocol(t *testing.T) { + addrNoPrefix := testFreeAddr(t) - rs := privval.NewRemoteSigner( - log.TestingLogger(), - cmn.RandStr(12), - addr, - types.NewMockPV(), - ed25519.GenPrivKey(), - ) - privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs) - privval.RemoteSignerConnRetries(1e6)(rs) config := cfg.ResetTestRoot("node_priv_val_tcp_test") - config.BaseConfig.PrivValidatorListenAddr = addr + config.BaseConfig.PrivValidatorListenAddr = addrNoPrefix - // kick off remote signer routine, and then start TM. - go func(rs *privval.RemoteSigner) { - rs.Start() - defer rs.Stop() - time.Sleep(100 * time.Millisecond) - }(rs) - - n, err := DefaultNewNode(config, log.TestingLogger()) - - assert.NoError(t, err, "expected no err on DefaultNewNode") - assert.IsType(t, &privval.TCPVal{}, n.PrivValidator()) + _, err := DefaultNewNode(config, log.TestingLogger()) + assert.Error(t, err) } func TestNodeSetPrivValIPC(t *testing.T) { tmpfile := "/tmp/kms." + cmn.RandStr(6) + ".sock" defer os.Remove(tmpfile) // clean up - addr := "unix://" + tmpfile + + config := cfg.ResetTestRoot("node_priv_val_tcp_test") + config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile rs := privval.NewIPCRemoteSigner( log.TestingLogger(), - cmn.RandStr(12), + config.ChainID(), tmpfile, types.NewMockPV(), ) - privval.IPCRemoteSignerConnDeadline(3 * time.Second)(rs) - // kick off remote signer routine, and then start TM. - go func(rs *privval.IPCRemoteSigner) { - rs.Start() - defer rs.Stop() - time.Sleep(500 * time.Millisecond) - }(rs) + done := make(chan struct{}) + go func() { + defer close(done) + n, err := DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + assert.IsType(t, &privval.IPCVal{}, n.PrivValidator()) + }() - config := cfg.ResetTestRoot("node_priv_val_tcp_test") - config.BaseConfig.PrivValidatorListenAddr = addr - n, err := DefaultNewNode(config, log.TestingLogger()) + err := rs.Start() + require.NoError(t, err) + defer rs.Stop() - assert.NoError(t, err, "expected no err on DefaultNewNode") - assert.IsType(t, &privval.IPCVal{}, n.PrivValidator()) + <-done } - // testFreeAddr claims a free port so we don't block on listener being ready. func testFreeAddr(t *testing.T) string { ln, err := net.Listen("tcp", "127.0.0.1:0") diff --git a/privval/ipc_server.go b/privval/ipc_server.go index d3907cbdb..ba9574771 100644 --- a/privval/ipc_server.go +++ b/privval/ipc_server.go @@ -69,6 +69,7 @@ func (rs *IPCRemoteSigner) OnStart() error { for { conn, err := rs.listener.AcceptUnix() if err != nil { + rs.Logger.Error("AcceptUnix", "err", err) return } go rs.handleConnection(conn)