Merge branch 'master' into marko/int64-

This commit is contained in:
Marko Baricevic
2021-03-15 12:03:26 +00:00
74 changed files with 1215 additions and 468 deletions

View File

@@ -44,7 +44,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: "1.15"
go-version: "1.16"
- uses: actions/checkout@v2
- uses: technote-space/get-diff-action@v4
with:
@@ -66,7 +66,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: "1.15"
go-version: "1.16"
- uses: actions/checkout@v2
- uses: technote-space/get-diff-action@v4
with:
@@ -81,7 +81,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.15
go-version: 1.16
- name: test & coverage report creation
run: |
cat pkgs.txt.part.${{ matrix.part }} | xargs go test -mod=readonly -timeout 8m -race -coverprofile=${{ matrix.part }}profile.out -covermode=atomic

View File

@@ -25,7 +25,7 @@ jobs:
make build-docs
- name: Deploy 🚀
uses: JamesIves/github-pages-deploy-action@4.0.0
uses: JamesIves/github-pages-deploy-action@4.1.0
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BRANCH: gh-pages

View File

@@ -23,7 +23,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: '1.15'
go-version: '1.16'
- uses: actions/checkout@v2
with:
@@ -49,7 +49,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Notify Slack on failure
uses: rtCamp/action-slack-notify@ae4223259071871559b6e9d08b24a63d71b3f0c0
uses: rtCamp/action-slack-notify@f565a63638bd3615e76249bffab00fcb9dab90f7
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
SLACK_CHANNEL: tendermint-internal
@@ -65,7 +65,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Notify Slack on success
uses: rtCamp/action-slack-notify@ae4223259071871559b6e9d08b24a63d71b3f0c0
uses: rtCamp/action-slack-notify@f565a63638bd3615e76249bffab00fcb9dab90f7
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
SLACK_CHANNEL: tendermint-internal

View File

@@ -22,7 +22,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: '1.15'
go-version: '1.16'
- uses: actions/checkout@v2
@@ -46,7 +46,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Notify Slack on failure
uses: rtCamp/action-slack-notify@ae4223259071871559b6e9d08b24a63d71b3f0c0
uses: rtCamp/action-slack-notify@f565a63638bd3615e76249bffab00fcb9dab90f7
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
SLACK_CHANNEL: tendermint-internal
@@ -62,7 +62,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Notify Slack on success
uses: rtCamp/action-slack-notify@ae4223259071871559b6e9d08b24a63d71b3f0c0
uses: rtCamp/action-slack-notify@f565a63638bd3615e76249bffab00fcb9dab90f7
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
SLACK_CHANNEL: tendermint-internal

View File

@@ -16,7 +16,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: '1.15'
go-version: '1.16'
- uses: actions/checkout@v2
- uses: technote-space/get-diff-action@v4
with:

View File

@@ -4,6 +4,10 @@ on:
workflow_dispatch: # allow running workflow manually
schedule:
- cron: '0 3 * * *'
pull_request:
branches: [master]
paths:
- "test/fuzz/**/*.go"
jobs:
fuzz-nightly-test:
@@ -11,7 +15,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: '1.15'
go-version: '1.16'
- uses: actions/checkout@v2
@@ -49,14 +53,14 @@ jobs:
with:
name: crashers
path: test/fuzz/**/crashers
retention-days: 1
retention-days: 3
- name: Archive suppressions
uses: actions/upload-artifact@v2
with:
name: suppressions
path: test/fuzz/**/suppressions
retention-days: 1
retention-days: 3
- name: Set crashers count
working-directory: test/fuzz
@@ -72,7 +76,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Notify Slack if any crashers
uses: rtCamp/action-slack-notify@ae4223259071871559b6e9d08b24a63d71b3f0c0
uses: rtCamp/action-slack-notify@f565a63638bd3615e76249bffab00fcb9dab90f7
env:
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
SLACK_CHANNEL: tendermint-internal

View File

@@ -18,7 +18,7 @@ jobs:
- uses: actions/setup-go@v2
with:
go-version: '1.15'
go-version: '1.16'
- run: echo https://github.com/tendermint/tendermint/blob/${GITHUB_REF#refs/tags/}/CHANGELOG.md#${GITHUB_REF#refs/tags/} > ../release_notes.md
if: startsWith(github.ref, 'refs/tags/')

View File

@@ -25,7 +25,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: "1.15"
go-version: "1.16"
- uses: actions/checkout@v2
- uses: technote-space/get-diff-action@v4
with:
@@ -57,7 +57,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: "1.15"
go-version: "1.16"
- uses: actions/checkout@v2
- uses: technote-space/get-diff-action@v4
with:
@@ -89,7 +89,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: "1.15"
go-version: "1.16"
- uses: actions/checkout@v2
- uses: technote-space/get-diff-action@v4
with:
@@ -120,7 +120,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: "1.15"
go-version: "1.16"
- uses: actions/checkout@v2
- uses: technote-space/get-diff-action@v4
with:

View File

@@ -1,5 +1,22 @@
# Changelog
## v0.34.8
*February 25, 2021*
This release, in conjunction with [a fix in the Cosmos SDK](https://github.com/cosmos/cosmos-sdk/pull/8641),
introduces changes that should mean the logs are much, much quieter. 🎉
Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermint).
### IMPROVEMENTS
- [libs/log] [\#6174](https://github.com/tendermint/tendermint/issues/6174) Include timestamp (`ts` field; `time.RFC3339Nano` format) in JSON logger output (@melekes)
### BUG FIXES
- [abci] [\#6124](https://github.com/tendermint/tendermint/issues/6124) Fixes a panic condition during callback execution in `ReCheckTx` during high tx load. (@alexanderbez)
## v0.34.7
*February 18, 2021*

View File

@@ -4,7 +4,7 @@
Special thanks to external contributors on this release:
Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermint).
Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermint).
### BREAKING CHANGES
@@ -42,6 +42,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [rpc/client/http] \#6176 Remove `endpoint` arg from `New`, `NewWithTimeout` and `NewWithClient` (@melekes)
- [rpc/client/http] \#6176 Unexpose `WSEvents` (@melekes)
- [rpc/jsonrpc/client/ws_client] \#6176 `NewWS` no longer accepts options (use `NewWSWithOptions` and `OnReconnect` funcs to configure the client) (@melekes)
- [rpc/jsonrpc/server] \#6204 Modify `WriteRPCResponseHTTP(Error)` to return an error (@melekes)
- Blockchain Protocol
@@ -71,12 +72,12 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- [rpc/client/http] \#6163 Do not drop events even if the `out` channel is full (@melekes)
- [node] \#6059 Validate and complete genesis doc before saving to state store (@silasdavis)
- [state] \#6067 Batch save state data (@githubsands & @cmwaters)
- [libs/log] \#6174 Include timestamp (`ts` field; `time.RFC3339Nano` format) in JSON logger output (@melekes)
- [privval/file] \#6185 Return error on `LoadFilePV`, `LoadFilePVEmptyState`. Allows for better programmatic control of Tendermint.
### BUG FIXES
- [ABCI] \#6124 Fixes a panic condition during callback execution in `ReCheckTx` during high tx load. (@alexanderbez)
- [types] \#5523 Change json naming of `PartSetHeader` within `BlockID` from `parts` to `part_set_header` (@marbar3778)
- [privval] \#5638 Increase read/write timeout to 5s and calculate ping interval based on it (@JoeKash)
- [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes)
- [blockchain/v1] \#5711 Fix deadlock (@melekes)
- [rpc/jsonrpc/server] \#6191 Correctly unmarshal `RPCRequest` when data is `null` (@melekes)

View File

@@ -64,7 +64,11 @@ func main() {
"rootCA", *rootCA,
)
pv := privval.LoadFilePV(*privValKeyPath, *privValStatePath)
pv, err := privval.LoadFilePV(*privValKeyPath, *privValStatePath)
if err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
opts := []grpc.ServerOption{}
if !*insecure {

View File

@@ -43,7 +43,11 @@ func initFilesWithConfig(config *cfg.Config) error {
err error
)
if tmos.FileExists(privValKeyFile) {
pv = privval.LoadFilePV(privValKeyFile, privValStateFile)
pv, err = privval.LoadFilePV(privValKeyFile, privValStateFile)
if err != nil {
return err
}
logger.Info("Found private validator", "keyFile", privValKeyFile,
"stateFile", privValStateFile)
} else {

View File

@@ -8,7 +8,6 @@ import (
"net/http"
"os"
"path/filepath"
"regexp"
"strings"
"time"
@@ -16,7 +15,6 @@ import (
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
tmos "github.com/tendermint/tendermint/libs/os"
@@ -24,7 +22,6 @@ import (
lproxy "github.com/tendermint/tendermint/light/proxy"
lrpc "github.com/tendermint/tendermint/light/rpc"
dbs "github.com/tendermint/tendermint/light/store/db"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
)
@@ -217,17 +214,11 @@ func runProxy(cmd *cobra.Command, args []string) error {
cfg.WriteTimeout = config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
rpcClient, err := rpchttp.NewWithTimeout(primaryAddr, cfg.WriteTimeout)
p, err := lproxy.NewProxy(c, listenAddr, primaryAddr, cfg, logger, lrpc.KeyPathFn(lrpc.DefaultMerkleKeyPathFn()))
if err != nil {
return fmt.Errorf("failed to create http client for %s: %w", primaryAddr, err)
return err
}
p := lproxy.Proxy{
Addr: listenAddr,
Config: cfg,
Client: lrpc.NewClient(rpcClient, c, lrpc.KeyPathFn(defaultMerkleKeyPathFn())),
Logger: logger,
}
// Stop upon receiving SIGTERM or CTRL-C.
tmos.TrapSignal(logger, func() {
p.Listener.Close()
@@ -266,21 +257,3 @@ func saveProviders(db dbm.DB, primaryAddr, witnessesAddrs string) error {
}
return nil
}
func defaultMerkleKeyPathFn() lrpc.KeyPathFunc {
// regexp for extracting store name from /abci_query path
storeNameRegexp := regexp.MustCompile(`\/store\/(.+)\/key`)
return func(path string, key []byte) (merkle.KeyPath, error) {
matches := storeNameRegexp.FindStringSubmatch(path)
if len(matches) != 2 {
return nil, fmt.Errorf("can't find store name in %s using %s", path, storeNameRegexp)
}
storeName := matches[1]
kp := merkle.KeyPath{}
kp = kp.AppendKey([]byte(storeName), merkle.KeyEncodingURL)
kp = kp.AppendKey(key, merkle.KeyEncodingURL)
return kp, nil
}
}

View File

@@ -17,7 +17,7 @@ var ResetAllCmd = &cobra.Command{
Use: "unsafe-reset-all",
Aliases: []string{"unsafe_reset_all"},
Short: "(unsafe) Remove all the data and WAL, reset this node's validator to genesis state",
Run: resetAll,
RunE: resetAll,
PreRun: deprecateSnakeCase,
}
@@ -34,26 +34,26 @@ var ResetPrivValidatorCmd = &cobra.Command{
Use: "unsafe-reset-priv-validator",
Aliases: []string{"unsafe_reset_priv_validator"},
Short: "(unsafe) Reset this node's validator to genesis state",
Run: resetPrivValidator,
RunE: resetPrivValidator,
PreRun: deprecateSnakeCase,
}
// XXX: this is totally unsafe.
// it's only suitable for testnets.
func resetAll(cmd *cobra.Command, args []string) {
ResetAll(config.DBDir(), config.P2P.AddrBookFile(), config.PrivValidatorKeyFile(),
func resetAll(cmd *cobra.Command, args []string) error {
return ResetAll(config.DBDir(), config.P2P.AddrBookFile(), config.PrivValidatorKeyFile(),
config.PrivValidatorStateFile(), logger)
}
// XXX: this is totally unsafe.
// it's only suitable for testnets.
func resetPrivValidator(cmd *cobra.Command, args []string) {
resetFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile(), logger)
func resetPrivValidator(cmd *cobra.Command, args []string) error {
return resetFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile(), logger)
}
// ResetAll removes address book files plus all data, and resets the privValdiator data.
// Exported so other CLI tools can use it.
func ResetAll(dbDir, addrBookFile, privValKeyFile, privValStateFile string, logger log.Logger) {
func ResetAll(dbDir, addrBookFile, privValKeyFile, privValStateFile string, logger log.Logger) error {
if keepAddrBook {
logger.Info("The address book remains intact")
} else {
@@ -68,24 +68,28 @@ func ResetAll(dbDir, addrBookFile, privValKeyFile, privValStateFile string, logg
if err := tmos.EnsureDir(dbDir, 0700); err != nil {
logger.Error("unable to recreate dbDir", "err", err)
}
resetFilePV(privValKeyFile, privValStateFile, logger)
return resetFilePV(privValKeyFile, privValStateFile, logger)
}
func resetFilePV(privValKeyFile, privValStateFile string, logger log.Logger) {
func resetFilePV(privValKeyFile, privValStateFile string, logger log.Logger) error {
if _, err := os.Stat(privValKeyFile); err == nil {
pv := privval.LoadFilePVEmptyState(privValKeyFile, privValStateFile)
pv, err := privval.LoadFilePVEmptyState(privValKeyFile, privValStateFile)
if err != nil {
return err
}
pv.Reset()
logger.Info("Reset private validator file to genesis state", "keyFile", privValKeyFile,
"stateFile", privValStateFile)
} else {
pv, err := privval.GenFilePV(privValKeyFile, privValStateFile, keyType)
if err != nil {
panic(err)
return err
}
pv.Save()
logger.Info("Generated private validator file", "keyFile", privValKeyFile,
"stateFile", privValStateFile)
}
return nil
}
func removeAddrBook(addrBookFile string, logger log.Logger) {

View File

@@ -47,7 +47,10 @@ func showValidator(cmd *cobra.Command, args []string) error {
return fmt.Errorf("private validator file %s does not exist", keyFilePath)
}
pv := privval.LoadFilePV(keyFilePath, config.PrivValidatorStateFile())
pv, err := privval.LoadFilePV(keyFilePath, config.PrivValidatorStateFile())
if err != nil {
return err
}
pubKey, err = pv.GetPubKey()
if err != nil {

View File

@@ -144,7 +144,10 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
pvKeyFile := filepath.Join(nodeDir, config.BaseConfig.PrivValidatorKey)
pvStateFile := filepath.Join(nodeDir, config.BaseConfig.PrivValidatorState)
pv := privval.LoadFilePV(pvKeyFile, pvStateFile)
pv, err := privval.LoadFilePV(pvKeyFile, pvStateFile)
if err != nil {
return err
}
pubKey, err := pv.GetPubKey()
if err != nil {
@@ -204,11 +207,11 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
// Gather persistent peer addresses.
var (
persistentPeers string
persistentPeers = make([]string, 0)
err error
)
if populatePersistentPeers {
persistentPeers, err = persistentPeersString(config)
persistentPeers, err = persistentPeersArray(config)
if err != nil {
_ = os.RemoveAll(outputDir)
return err
@@ -222,7 +225,14 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
config.P2P.AddrBookStrict = false
config.P2P.AllowDuplicateIP = true
if populatePersistentPeers {
config.P2P.PersistentPeers = persistentPeers
persistentPeersWithoutSelf := make([]string, 0)
for j := 0; j < len(persistentPeers); j++ {
if j == i {
continue
}
persistentPeersWithoutSelf = append(persistentPeersWithoutSelf, persistentPeers[j])
}
config.P2P.PersistentPeers = strings.Join(persistentPeersWithoutSelf, ",")
}
config.Moniker = moniker(i)
@@ -253,18 +263,19 @@ func hostnameOrIP(i int) string {
return ip.String()
}
func persistentPeersString(config *cfg.Config) (string, error) {
persistentPeers := make([]string, nValidators+nNonValidators)
// get an array of persistent peers
func persistentPeersArray(config *cfg.Config) ([]string, error) {
peers := make([]string, nValidators+nNonValidators)
for i := 0; i < nValidators+nNonValidators; i++ {
nodeDir := filepath.Join(outputDir, fmt.Sprintf("%s%d", nodeDirPrefix, i))
config.SetRoot(nodeDir)
nodeKey, err := p2p.LoadNodeKey(config.NodeKeyFile())
if err != nil {
return "", err
return []string{}, err
}
persistentPeers[i] = p2p.IDAddressString(nodeKey.ID, fmt.Sprintf("%s:%d", hostnameOrIP(i), p2pPort))
peers[i] = p2p.IDAddressString(nodeKey.ID, fmt.Sprintf("%s:%d", hostnameOrIP(i), p2pPort))
}
return strings.Join(persistentPeers, ","), nil
return peers, nil
}
func moniker(i int) string {

View File

@@ -672,7 +672,8 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile)
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
privVal, err := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
require.NoError(t, err)
wal, err := NewWAL(walFile)
require.NoError(t, err)
@@ -884,7 +885,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - 0x03
config := ResetConfig("handshake_test_")
t.Cleanup(func() { os.RemoveAll(config.RootDir) })
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
privVal, err := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
require.NoError(t, err)
const appVersion = 0x0
pubKey, err := privVal.GetPubKey()
require.NoError(t, err)
@@ -1220,7 +1222,8 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
config := ResetConfig("handshake_test_")
t.Cleanup(func() { _ = os.RemoveAll(config.RootDir) })
privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
privVal, err := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
require.NoError(t, err)
pubKey, err := privVal.GetPubKey()
require.NoError(t, err)
stateDB, state, store := stateAndStore(config, pubKey, 0x0)

View File

@@ -1147,8 +1147,7 @@ func (cs *State) defaultDecideProposal(height uint64, round int32) {
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
}
cs.Logger.Info("signed proposal", "height", height, "round", round, "proposal", proposal)
cs.Logger.Debug("signed proposal block", "block", block)
cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal)
} else if !cs.replayMode {
cs.Logger.Error("propose step; failed signing proposal", "height", height, "round", round, "err", err)
}
@@ -1661,7 +1660,7 @@ func (cs *State) finalizeCommit(height uint64) {
if err != nil {
logger.Error("failed to prune blocks", "retain_height", retainHeight, "err", err)
} else {
logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
logger.Debug("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
}
}
@@ -1960,7 +1959,7 @@ func (cs *State) tryAddVote(vote *types.Vote, peerID p2p.NodeID) (bool, error) {
// report conflicting votes to the evidence pool
cs.evpool.ReportConflictingVotes(voteErr.VoteA, voteErr.VoteB)
cs.Logger.Info(
cs.Logger.Debug(
"found and sent conflicting votes to the evidence pool",
"vote_a", voteErr.VoteA,
"vote_b", voteErr.VoteB,
@@ -2124,7 +2123,7 @@ func (cs *State) addVote(vote *types.Vote, peerID p2p.NodeID) (added bool, err e
case tmproto.PrecommitType:
precommits := cs.Votes.Precommits(vote.Round)
cs.Logger.Info("added vote to precommit", "vote", vote, "precommits", precommits.StringShort())
cs.Logger.Debug("added vote to precommit", "vote", vote, "precommits", precommits.StringShort())
blockID, ok := precommits.TwoThirdsMajority()
if ok {
@@ -2233,7 +2232,7 @@ func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash []byte, header
vote, err := cs.signVote(msgType, hash, header)
if err == nil {
cs.sendInternalMessage(msgInfo{&VoteMessage{vote}, ""})
cs.Logger.Info("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
cs.Logger.Debug("signed and pushed vote", "height", cs.Height, "round", cs.Round, "vote", vote)
return vote
}

View File

@@ -336,7 +336,6 @@ Put the following code into the "main.go" file:
package main
import (
"errors"
"flag"
"fmt"
"os"
@@ -410,7 +409,7 @@ func newTendermint(app abci.Application, configFile string) (*nm.Node, error) {
// create logger
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
var err error
logger, err = tmflags.ParseLogLevel(config.LogLevel, logger, cfg.DefaultLogLevel())
logger, err = tmflags.ParseLogLevel(config.LogLevel, logger, cfg.DefaultLogLevel)
if err != nil {
return nil, fmt.Errorf("failed to parse log level: %w", err)
}

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/tendermint/tendermint/libs/log"
@@ -126,8 +127,10 @@ type Client struct {
pruningSize uint16
// See ConfirmationFunction option
confirmationFn func(action string) bool
quit chan struct{}
// The light client keeps track of how many times it has requested a light
// block from it's providers. When this exceeds the amount of witnesses the
// light client will just return the last error sent by the providers
// repeatRequests uint16
logger log.Logger
}
@@ -161,14 +164,14 @@ func NewClient(
}
if c.latestTrustedBlock != nil {
c.logger.Info("Checking trusted light block using options")
c.logger.Info("checking trusted light block using options")
if err := c.checkTrustedHeaderUsingOptions(ctx, trustOptions); err != nil {
return nil, err
}
}
if c.latestTrustedBlock == nil || c.latestTrustedBlock.Height < trustOptions.Height {
c.logger.Info("Downloading trusted light block using options")
c.logger.Info("downloading trusted light block using options")
if err := c.initializeWithTrustOptions(ctx, trustOptions); err != nil {
return nil, err
}
@@ -199,7 +202,6 @@ func NewClientFromTrustedStore(
trustedStore: trustedStore,
pruningSize: defaultPruningSize,
confirmationFn: func(action string) bool { return true },
quit: make(chan struct{}),
logger: log.NewNopLogger(),
}
@@ -237,7 +239,7 @@ func (c *Client) restoreTrustedLightBlock() error {
return fmt.Errorf("can't get last trusted light block: %w", err)
}
c.latestTrustedBlock = trustedBlock
c.logger.Info("Restored trusted light block", "height", lastHeight)
c.logger.Info("restored trusted light block", "height", lastHeight)
}
return nil
@@ -273,7 +275,7 @@ func (c *Client) checkTrustedHeaderUsingOptions(ctx context.Context, options Tru
case options.Height == c.latestTrustedBlock.Height:
primaryHash = options.Hash
case options.Height < c.latestTrustedBlock.Height:
c.logger.Info("Client initialized with old header (trusted is more recent)",
c.logger.Info("client initialized with old header (trusted is more recent)",
"old", options.Height,
"trustedHeight", c.latestTrustedBlock.Height,
"trustedHash", c.latestTrustedBlock.Hash())
@@ -299,11 +301,11 @@ func (c *Client) checkTrustedHeaderUsingOptions(ctx context.Context, options Tru
}
if !bytes.Equal(primaryHash, c.latestTrustedBlock.Hash()) {
c.logger.Info("Prev. trusted header's hash (h1) doesn't match hash from primary provider (h2)",
c.logger.Info("previous trusted header's hash (h1) doesn't match hash from primary provider (h2)",
"h1", c.latestTrustedBlock.Hash(), "h2", primaryHash)
action := fmt.Sprintf(
"Prev. trusted header's hash %X doesn't match hash %X from primary provider. Remove all the stored light blocks?",
"Previous trusted header's hash %X doesn't match hash %X from primary provider. Remove all the stored light blocks?",
c.latestTrustedBlock.Hash(), primaryHash)
if c.confirmationFn(action) {
err := c.Cleanup()
@@ -413,7 +415,7 @@ func (c *Client) Update(ctx context.Context, now time.Time) (*types.LightBlock,
if err != nil {
return nil, err
}
c.logger.Info("Advanced to new state", "height", latestBlock.Height, "hash", latestBlock.Hash())
c.logger.Info("advanced to new state", "height", latestBlock.Height, "hash", latestBlock.Hash())
return latestBlock, nil
}
@@ -441,7 +443,7 @@ func (c *Client) VerifyLightBlockAtHeight(
// Check if the light block is already verified.
h, err := c.TrustedLightBlock(height)
if err == nil {
c.logger.Info("Header has already been verified", "height", height, "hash", h.Hash())
c.logger.Debug("header has already been verified", "height", height, "hash", h.Hash())
// Return already trusted light block
return h, nil
}
@@ -498,7 +500,7 @@ func (c *Client) VerifyHeader(ctx context.Context, newHeader *types.Header, now
if !bytes.Equal(l.Hash(), newHeader.Hash()) {
return fmt.Errorf("existing trusted header %X does not match newHeader %X", l.Hash(), newHeader.Hash())
}
c.logger.Info("Header has already been verified",
c.logger.Debug("header has already been verified",
"height", newHeader.Height, "hash", newHeader.Hash())
return nil
}
@@ -517,7 +519,7 @@ func (c *Client) VerifyHeader(ctx context.Context, newHeader *types.Header, now
}
func (c *Client) verifyLightBlock(ctx context.Context, newLightBlock *types.LightBlock, now time.Time) error {
c.logger.Info("VerifyHeader", "height", newLightBlock.Height, "hash", newLightBlock.Hash())
c.logger.Info("verify light block", "height", newLightBlock.Height, "hash", newLightBlock.Hash())
var (
verifyFunc func(ctx context.Context, trusted *types.LightBlock, new *types.LightBlock, now time.Time) error
@@ -562,7 +564,7 @@ func (c *Client) verifyLightBlock(ctx context.Context, newLightBlock *types.Ligh
err = verifyFunc(ctx, closestBlock, newLightBlock, now)
}
if err != nil {
c.logger.Error("Can't verify", "err", err)
c.logger.Error("failed to verify", "err", err)
return err
}
@@ -596,7 +598,7 @@ func (c *Client) verifySequential(
}
// 2) Verify them
c.logger.Debug("Verify adjacent newLightBlock against verifiedBlock",
c.logger.Debug("verify adjacent newLightBlock against verifiedBlock",
"trustedHeight", verifiedBlock.Height,
"trustedHash", verifiedBlock.Hash(),
"newHeight", interimBlock.Height,
@@ -611,32 +613,21 @@ func (c *Client) verifySequential(
case ErrInvalidHeader:
// If the target header is invalid, return immediately.
if err.To == newLightBlock.Height {
c.logger.Debug("Target header is invalid", "err", err)
c.logger.Debug("target header is invalid", "err", err)
return err
}
// If some intermediate header is invalid, replace the primary and try
// again.
c.logger.Error("primary sent invalid header -> replacing", "err", err, "primary", c.primary)
replaceErr := c.replacePrimaryProvider()
if replaceErr != nil {
c.logger.Error("Can't replace primary", "err", replaceErr)
// return original error
return err
}
// If some intermediate header is invalid, remove the primary and try again.
c.logger.Error("primary sent invalid header -> removing", "err", err, "primary", c.primary)
replacementBlock, fErr := c.lightBlockFromPrimary(ctx, newLightBlock.Height)
if fErr != nil {
c.logger.Error("Can't fetch light block from primary", "err", fErr)
// return original error
replacementBlock, removeErr := c.findNewPrimary(ctx, newLightBlock.Height, true)
if removeErr != nil {
c.logger.Debug("failed to replace primary. Returning original error", "err", removeErr)
return err
}
if !bytes.Equal(replacementBlock.Hash(), newLightBlock.Hash()) {
c.logger.Error("Replacement provider has a different light block",
"newHash", newLightBlock.Hash(),
"replHash", replacementBlock.Hash())
// return original error
c.logger.Debug("replaced primary but new primary has a different block to the initial one")
return err
}
@@ -687,7 +678,7 @@ func (c *Client) verifySkipping(
)
for {
c.logger.Debug("Verify non-adjacent newHeader against verifiedBlock",
c.logger.Debug("verify non-adjacent newHeader against verifiedBlock",
"trustedHeight", verifiedBlock.Height,
"trustedHash", verifiedBlock.Hash(),
"newHeight", blockCache[depth].Height,
@@ -717,10 +708,20 @@ func (c *Client) verifySkipping(
pivotHeight := verifiedBlock.Height + (blockCache[depth].Height-verifiedBlock.
Height)*verifySkippingNumerator/verifySkippingDenominator
interimBlock, providerErr := source.LightBlock(ctx, pivotHeight)
if providerErr != nil {
switch providerErr {
case nil:
blockCache = append(blockCache, interimBlock)
// if the error is benign, the client does not need to replace the primary
case provider.ErrLightBlockNotFound, provider.ErrNoResponse:
return nil, err
// all other errors such as ErrBadLightBlock or ErrUnreliableProvider are seen as malevolent and the
// provider is removed
default:
return nil, ErrVerificationFailed{From: verifiedBlock.Height, To: pivotHeight, Reason: providerErr}
}
blockCache = append(blockCache, interimBlock)
}
depth++
@@ -745,32 +746,20 @@ func (c *Client) verifySkippingAgainstPrimary(
// If the target header is invalid, return immediately.
invalidHeaderHeight := err.(ErrVerificationFailed).To
if invalidHeaderHeight == newLightBlock.Height {
c.logger.Debug("Target header is invalid", "err", err)
c.logger.Debug("target header is invalid", "err", err)
return err
}
// If some intermediate header is invalid, replace the primary and try
// again.
// If some intermediate header is invalid, remove the primary and try again.
c.logger.Error("primary sent invalid header -> replacing", "err", err, "primary", c.primary)
replaceErr := c.replacePrimaryProvider()
if replaceErr != nil {
c.logger.Error("Can't replace primary", "err", replaceErr)
// return original error
return err
}
replacementBlock, fErr := c.lightBlockFromPrimary(ctx, newLightBlock.Height)
if fErr != nil {
c.logger.Error("Can't fetch light block from primary", "err", fErr)
// return original error
replacementBlock, removeErr := c.findNewPrimary(ctx, newLightBlock.Height, true)
if removeErr != nil {
c.logger.Error("failed to replace primary. Returning original error", "err", removeErr)
return err
}
if !bytes.Equal(replacementBlock.Hash(), newLightBlock.Hash()) {
c.logger.Error("Replacement provider has a different light block",
"newHash", newLightBlock.Hash(),
"replHash", replacementBlock.Hash())
// return original error
c.logger.Debug("replaced primary but new primary has a different block to the initial one. Returning original error")
return err
}
@@ -836,7 +825,7 @@ func (c *Client) Witnesses() []provider.Provider {
// Cleanup removes all the data (headers and validator sets) stored. Note: the
// client must be stopped at this point.
func (c *Client) Cleanup() error {
c.logger.Info("Removing all light blocks")
c.logger.Info("removing all light blocks")
c.latestTrustedBlock = nil
return c.trustedStore.Prune(0)
}
@@ -873,6 +862,8 @@ func (c *Client) cleanupAfter(height uint64) error {
}
func (c *Client) updateTrustedLightBlock(l *types.LightBlock) error {
c.logger.Debug("updating trusted light block", "light_block", l)
if err := c.trustedStore.SaveLightBlock(l); err != nil {
return fmt.Errorf("failed to save trusted header: %w", err)
}
@@ -909,21 +900,31 @@ func (c *Client) backwards(
return fmt.Errorf("failed to obtain the header at height #%d: %w", verifiedHeader.Height-1, err)
}
interimHeader = interimBlock.Header
c.logger.Debug("Verify newHeader against verifiedHeader",
c.logger.Debug("verify newHeader against verifiedHeader",
"trustedHeight", verifiedHeader.Height,
"trustedHash", verifiedHeader.Hash(),
"newHeight", interimHeader.Height,
"newHash", interimHeader.Hash())
if err := VerifyBackwards(interimHeader, verifiedHeader); err != nil {
c.logger.Error("primary sent invalid header -> replacing", "err", err, "primary", c.primary)
if replaceErr := c.replacePrimaryProvider(); replaceErr != nil {
c.logger.Error("Can't replace primary", "err", replaceErr)
// return original error
return fmt.Errorf("verify backwards from %d to %d failed: %w",
verifiedHeader.Height, interimHeader.Height, err)
// verification has failed
c.logger.Error("backwards verification failed, replacing primary...", "err", err, "primary", c.primary)
// the client tries to see if it can get a witness to continue with the request
newPrimarysBlock, replaceErr := c.findNewPrimary(ctx, newHeader.Height, true)
if replaceErr != nil {
c.logger.Debug("failed to replace primary. Returning original error", "err", replaceErr)
return err
}
// we need to verify the header at the same height again
continue
// before continuing we must check that they have the same target header to validate
if !bytes.Equal(newPrimarysBlock.Hash(), newHeader.Hash()) {
c.logger.Debug("replaced primary but new primary has a different block to the initial one")
// return the original error
return err
}
// try again with the new primary
return c.backwards(ctx, verifiedHeader, newPrimarysBlock.Header)
}
verifiedHeader = interimHeader
}
@@ -931,55 +932,145 @@ func (c *Client) backwards(
return nil
}
// NOTE: requires a providerMutex locked.
func (c *Client) removeWitness(idx int) {
switch len(c.witnesses) {
case 0:
panic(fmt.Sprintf("wanted to remove %d element from empty witnesses slice", idx))
case 1:
c.witnesses = make([]provider.Provider, 0)
default:
c.witnesses[idx] = c.witnesses[len(c.witnesses)-1]
c.witnesses = c.witnesses[:len(c.witnesses)-1]
}
}
// replaceProvider takes the first alternative provider and promotes it as the
// primary provider.
func (c *Client) replacePrimaryProvider() error {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()
if len(c.witnesses) <= 1 {
return ErrNoWitnesses
}
c.primary = c.witnesses[0]
c.witnesses = c.witnesses[1:]
c.logger.Info("Replacing primary with the first witness", "new_primary", c.primary)
return nil
}
// lightBlockFromPrimary retrieves the lightBlock from the primary provider
// at the specified height. Handles dropout by the primary provider by swapping
// with an alternative provider.
// at the specified height. This method also handles provider behavior as follows:
//
// 1. If the provider does not respond or does not have the block, it tries again
// with a different provider
// 2. If all providers return the same error, the light client forwards the error to
// where the initial request came from
// 3. If the provider provides an invalid light block, is deemed unreliable or returns
// any other error, the primary is permanently dropped and is replaced by a witness.
func (c *Client) lightBlockFromPrimary(ctx context.Context, height uint64) (*types.LightBlock, error) {
c.providerMutex.Lock()
l, err := c.primary.LightBlock(ctx, height)
c.providerMutex.Unlock()
if err != nil {
c.logger.Debug("Error on light block request from primary", "error", err, "primary", c.primary)
replaceErr := c.replacePrimaryProvider()
if replaceErr != nil {
return nil, fmt.Errorf("%v. Tried to replace primary but: %w", err.Error(), replaceErr)
}
// replace primary and request a light block again
return c.lightBlockFromPrimary(ctx, height)
switch err {
case nil:
// Everything went smoothly. We reset the lightBlockRequests and return the light block
return l, nil
case provider.ErrNoResponse, provider.ErrLightBlockNotFound:
// we find a new witness to replace the primary
c.logger.Debug("error from light block request from primary, replacing...", "error", err, "primary", c.primary)
return c.findNewPrimary(ctx, height, false)
default:
// The light client has most likely received either provider.ErrUnreliableProvider or provider.ErrBadLightBlock
// These errors mean that the light client should drop the primary and try with another provider instead
c.logger.Error("error from light block request from primary, removing...", "error", err, "primary", c.primary)
return c.findNewPrimary(ctx, height, true)
}
return l, err
}
// compareFirstHeaderWithWitnesses compares h with all witnesses. If any
// NOTE: requires a providerMutex lock
func (c *Client) removeWitnesses(indexes []int) error {
// check that we will still have witnesses remaining
if len(c.witnesses) <= len(indexes) {
return ErrNoWitnesses
}
// we need to make sure that we remove witnesses by index in the reverse
// order so as to not affect the indexes themselves
sort.Ints(indexes)
for i := len(indexes) - 1; i >= 0; i-- {
c.witnesses[indexes[i]] = c.witnesses[len(c.witnesses)-1]
c.witnesses = c.witnesses[:len(c.witnesses)-1]
}
return nil
}
type witnessResponse struct {
lb *types.LightBlock
witnessIndex int
err error
}
// findNewPrimary concurrently sends a light block request, promoting the first witness to return
// a valid light block as the new primary. The remove option indicates whether the primary should be
// entire removed or just appended to the back of the witnesses list. This method also handles witness
// errors. If no witness is available, it returns the last error of the witness.
func (c *Client) findNewPrimary(ctx context.Context, height uint64, remove bool) (*types.LightBlock, error) {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()
if len(c.witnesses) <= 1 {
return nil, ErrNoWitnesses
}
var (
witnessResponsesC = make(chan witnessResponse, len(c.witnesses))
witnessesToRemove []int
lastError error
wg sync.WaitGroup
)
// send out a light block request to all witnesses
subctx, cancel := context.WithCancel(ctx)
defer cancel()
for index := range c.witnesses {
wg.Add(1)
go func(witnessIndex int, witnessResponsesC chan witnessResponse) {
defer wg.Done()
lb, err := c.witnesses[witnessIndex].LightBlock(subctx, height)
witnessResponsesC <- witnessResponse{lb, witnessIndex, err}
}(index, witnessResponsesC)
}
// process all the responses as they come in
for i := 0; i < cap(witnessResponsesC); i++ {
response := <-witnessResponsesC
switch response.err {
// success! We have found a new primary
case nil:
cancel() // cancel all remaining requests to other witnesses
wg.Wait() // wait for all goroutines to finish
// if we are not intending on removing the primary then append the old primary to the end of the witness slice
if !remove {
c.witnesses = append(c.witnesses, c.primary)
}
// promote respondent as the new primary
c.logger.Debug("found new primary", "primary", c.witnesses[response.witnessIndex])
c.primary = c.witnesses[response.witnessIndex]
// add promoted witness to the list of witnesses to be removed
witnessesToRemove = append(witnessesToRemove, response.witnessIndex)
// remove witnesses marked as bad (the client must do this before we alter the witness slice and change the indexes
// of witnesses). Removal is done in descending order
if err := c.removeWitnesses(witnessesToRemove); err != nil {
return nil, err
}
// return the light block that new primary responded with
return response.lb, nil
// process benign errors by logging them only
case provider.ErrNoResponse, provider.ErrLightBlockNotFound:
lastError = response.err
c.logger.Debug("error on light block request from witness",
"error", response.err, "primary", c.witnesses[response.witnessIndex])
continue
// process malevolent errors like ErrUnreliableProvider and ErrBadLightBlock by removing the witness
default:
lastError = response.err
c.logger.Error("error on light block request from witness, removing...",
"error", response.err, "primary", c.witnesses[response.witnessIndex])
witnessesToRemove = append(witnessesToRemove, response.witnessIndex)
}
}
return nil, lastError
}
// compareFirstHeaderWithWitnesses concurrently compares h with all witnesses. If any
// witness reports a different header than h, the function returns an error.
func (c *Client) compareFirstHeaderWithWitnesses(ctx context.Context, h *types.SignedHeader) error {
compareCtx, cancel := context.WithCancel(ctx)
@@ -1007,26 +1098,20 @@ func (c *Client) compareFirstHeaderWithWitnesses(ctx context.Context, h *types.S
case nil:
continue
case errConflictingHeaders:
c.logger.Error(fmt.Sprintf(`Witness #%d has a different header. Please check primary is correct
and remove witness. Otherwise, use the different primary`, e.WitnessIndex), "witness", c.witnesses[e.WitnessIndex])
c.logger.Error(fmt.Sprintf(`witness #%d has a different header. Please check primary is correct
and remove witness. Otherwise, use a different primary`, e.WitnessIndex), "witness", c.witnesses[e.WitnessIndex])
return err
case errBadWitness:
// If witness sent us an invalid header, then remove it. If it didn't
// respond or couldn't find the block, then we ignore it and move on to
// the next witness.
if _, ok := e.Reason.(provider.ErrBadLightBlock); ok {
c.logger.Info("Witness sent us invalid header / vals -> removing it", "witness", c.witnesses[e.WitnessIndex])
c.logger.Info("Witness sent an invalid light block, removing...", "witness", c.witnesses[e.WitnessIndex])
witnessesToRemove = append(witnessesToRemove, e.WitnessIndex)
}
}
}
// we need to make sure that we remove witnesses by index in the reverse
// order so as to not affect the indexes themselves
sort.Ints(witnessesToRemove)
for i := len(witnessesToRemove) - 1; i >= 0; i-- {
c.removeWitness(witnessesToRemove[i])
}
return nil
// remove all witnesses that misbehaved
return c.removeWitnesses(witnessesToRemove)
}

View File

@@ -740,7 +740,7 @@ func TestClient_Concurrency(t *testing.T) {
defer wg.Done()
// NOTE: Cleanup, Stop, VerifyLightBlockAtHeight and Verify are not supposed
// to be concurrenly safe.
// to be concurrently safe.
assert.Equal(t, chainID, c.ChainID())
@@ -774,8 +774,12 @@ func TestClientReplacesPrimaryWithWitnessIfPrimaryIsUnavailable(t *testing.T) {
_, err = c.Update(ctx, bTime.Add(2*time.Hour))
require.NoError(t, err)
// the primary should no longer be the deadNode
assert.NotEqual(t, c.Primary(), deadNode)
assert.Equal(t, 1, len(c.Witnesses()))
// we should still have the dead node as a witness because it
// hasn't repeatedly been unresponsive yet
assert.Equal(t, 2, len(c.Witnesses()))
}
func TestClient_BackwardsVerification(t *testing.T) {

View File

@@ -5,7 +5,6 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"
"github.com/tendermint/tendermint/light/provider"
@@ -35,7 +34,7 @@ func (c *Client) detectDivergence(ctx context.Context, primaryTrace []*types.Lig
lastVerifiedHeader = primaryTrace[len(primaryTrace)-1].SignedHeader
witnessesToRemove = make([]int, 0)
)
c.logger.Debug("Running detector against trace", "endBlockHeight", lastVerifiedHeader.Height,
c.logger.Debug("running detector against trace", "endBlockHeight", lastVerifiedHeader.Height,
"endBlockHash", lastVerifiedHeader.Hash, "length", len(primaryTrace))
c.providerMutex.Lock()
@@ -75,7 +74,7 @@ func (c *Client) detectDivergence(ctx context.Context, primaryTrace []*types.Lig
now,
)
if err != nil {
c.logger.Info("Error validating witness's divergent header", "witness", supportingWitness, "err", err)
c.logger.Info("error validating witness's divergent header", "witness", supportingWitness, "err", err)
witnessesToRemove = append(witnessesToRemove, e.WitnessIndex)
continue
}
@@ -83,7 +82,7 @@ func (c *Client) detectDivergence(ctx context.Context, primaryTrace []*types.Lig
// We are suspecting that the primary is faulty, hence we hold the witness as the source of truth
// and generate evidence against the primary that we can send to the witness
primaryEv := newLightClientAttackEvidence(primaryBlock, witnessTrace[len(witnessTrace)-1], witnessTrace[0])
c.logger.Error("Attempted attack detected. Sending evidence againt primary by witness", "ev", primaryEv,
c.logger.Error("ATTEMPTED ATTACK DETECTED. Sending evidence againt primary by witness", "ev", primaryEv,
"primary", c.primary, "witness", supportingWitness)
c.sendEvidence(ctx, primaryEv, supportingWitness)
@@ -117,22 +116,17 @@ func (c *Client) detectDivergence(ctx context.Context, primaryTrace []*types.Lig
return ErrLightClientAttack
case errBadWitness:
c.logger.Info("Witness returned an error during header comparison", "witness", c.witnesses[e.WitnessIndex],
"err", err)
// if witness sent us an invalid header, then remove it. If it didn't respond or couldn't find the block, then we
// ignore it and move on to the next witness
if _, ok := e.Reason.(provider.ErrBadLightBlock); ok {
c.logger.Info("Witness sent us invalid header / vals -> removing it", "witness", c.witnesses[e.WitnessIndex])
witnessesToRemove = append(witnessesToRemove, e.WitnessIndex)
}
c.logger.Info("witness returned an error during header comparison, removing...",
"witness", c.witnesses[e.WitnessIndex], "err", err)
witnessesToRemove = append(witnessesToRemove, e.WitnessIndex)
default:
c.logger.Debug("error in light block request to witness", "err", err)
}
}
// we need to make sure that we remove witnesses by index in the reverse
// order so as to not affect the indexes themselves
sort.Ints(witnessesToRemove)
for i := len(witnessesToRemove) - 1; i >= 0; i-- {
c.removeWitness(witnessesToRemove[i])
// remove witnesses that have misbehaved
if err := c.removeWitnesses(witnessesToRemove); err != nil {
return err
}
// 1. If we had at least one witness that returned the same header then we
@@ -156,7 +150,17 @@ func (c *Client) compareNewHeaderWithWitness(ctx context.Context, errc chan erro
witness provider.Provider, witnessIndex int) {
lightBlock, err := witness.LightBlock(ctx, h.Height)
if err != nil {
switch err {
case nil:
break
case provider.ErrNoResponse, provider.ErrLightBlockNotFound:
errc <- err
return
default:
// all other errors (i.e. invalid block or unreliable provider) we mark the witness as bad
// and remove it
errc <- errBadWitness{Reason: err, WitnessIndex: witnessIndex}
return
}
@@ -165,7 +169,7 @@ func (c *Client) compareNewHeaderWithWitness(ctx context.Context, errc chan erro
errc <- errConflictingHeaders{Block: lightBlock, WitnessIndex: witnessIndex}
}
c.logger.Debug("Matching header received by witness", "height", h.Height, "witness", witnessIndex)
c.logger.Debug("matching header received by witness", "height", h.Height, "witness", witnessIndex)
errc <- nil
}
@@ -173,7 +177,7 @@ func (c *Client) compareNewHeaderWithWitness(ctx context.Context, errc chan erro
func (c *Client) sendEvidence(ctx context.Context, ev *types.LightClientAttackEvidence, receiver provider.Provider) {
err := receiver.ReportEvidence(ctx, ev)
if err != nil {
c.logger.Error("Failed to report evidence to provider", "ev", ev, "provider", receiver)
c.logger.Error("failed to report evidence to provider", "ev", ev, "provider", receiver)
}
}

View File

@@ -256,5 +256,5 @@ func TestClientDivergentTraces3(t *testing.T) {
_, err = c.VerifyLightBlockAtHeight(ctx, 10, bTime.Add(1*time.Hour))
assert.Error(t, err)
assert.Equal(t, 0, len(c.Witnesses()))
assert.Equal(t, 1, len(c.Witnesses()))
}

View File

@@ -7,15 +7,15 @@ import (
var (
// ErrLightBlockNotFound is returned when a provider can't find the
// requested header.
// requested header. The light client will not remove the provider
ErrLightBlockNotFound = errors.New("light block not found")
// ErrNoResponse is returned if the provider doesn't respond to the
// request in a gieven time
// request in a given time. The light client will not remove the provider
ErrNoResponse = errors.New("client failed to respond")
)
// ErrBadLightBlock is returned when a provider returns an invalid
// light block.
// light block. The light client will remove the provider.
type ErrBadLightBlock struct {
Reason error
}
@@ -23,3 +23,14 @@ type ErrBadLightBlock struct {
func (e ErrBadLightBlock) Error() string {
return fmt.Sprintf("client provided bad signed header: %s", e.Reason.Error())
}
// ErrUnreliableProvider is a generic error that indicates that the provider isn't
// behaving in a reliable manner to the light client. The light client will
// remove the provider
type ErrUnreliableProvider struct {
Reason string
}
func (e ErrUnreliableProvider) Error() string {
return fmt.Sprintf("client deemed unreliable: %s", e.Reason)
}

View File

@@ -18,8 +18,10 @@ import (
)
var defaultOptions = Options{
MaxRetryAttempts: 5,
Timeout: 3 * time.Second,
MaxRetryAttempts: 5,
Timeout: 3 * time.Second,
NoBlockThreshold: 5,
NoResponseThreshold: 5,
}
// http provider uses an RPC client to obtain the necessary information.
@@ -27,14 +29,37 @@ type http struct {
chainID string
client rpcclient.RemoteClient
maxRetryAttempts int
// httt provider heuristics
// The provider tracks the amount of times that the
// client doesn't respond. If this exceeds the threshold
// then the provider will return an unreliable provider error
noResponseThreshold uint16
noResponseCount uint16
// The provider tracks the amount of time the client
// doesn't have a block. If this exceeds the threshold
// then the provider will return an unreliable provider error
noBlockThreshold uint16
noBlockCount uint16
// In a single request, the provider attempts multiple times
// with exponential backoff to reach the client. If this
// exceeds the maxRetry attempts, this result in a ErrNoResponse
maxRetryAttempts uint16
}
type Options struct {
// -1 means no limit
MaxRetryAttempts int
// 0 means no retries
MaxRetryAttempts uint16
// 0 means no timeout.
Timeout time.Duration
// The amount of requests that a client doesn't have the block
// for before the provider deems the client unreliable
NoBlockThreshold uint16
// The amount of requests that a client doesn't respond to
// before the provider deems the client unreliable
NoResponseThreshold uint16
}
// New creates a HTTP provider, which is using the rpchttp.HTTP client under
@@ -67,9 +92,11 @@ func NewWithClient(chainID string, client rpcclient.RemoteClient) provider.Provi
// NewWithClient allows you to provide a custom client.
func NewWithClientAndOptions(chainID string, client rpcclient.RemoteClient, options Options) provider.Provider {
return &http{
client: client,
chainID: chainID,
maxRetryAttempts: options.MaxRetryAttempts,
client: client,
chainID: chainID,
maxRetryAttempts: options.MaxRetryAttempts,
noResponseThreshold: options.NoResponseThreshold,
noBlockThreshold: options.NoBlockThreshold,
}
}
@@ -130,7 +157,8 @@ func (p *http) validatorSet(ctx context.Context, height *uint64) (*types.Validat
for len(vals) != total && page <= maxPages {
// create another for loop to control retries. If p.maxRetryAttempts
// is negative we will keep repeating.
for attempt := 0; attempt != p.maxRetryAttempts+1; attempt++ {
attempt := uint16(0)
for {
res, err := p.client.Validators(ctx, height, &page, &perPage)
switch e := err.(type) {
case nil: // success!! Now we validate the response
@@ -149,8 +177,13 @@ func (p *http) validatorSet(ctx context.Context, height *uint64) (*types.Validat
case *url.Error:
if e.Timeout() {
// if we have exceeded retry attempts then return a no response error
if attempt == p.maxRetryAttempts {
return nil, p.noResponse()
}
attempt++
// request timed out: we wait and try again with exponential backoff
time.Sleep(backoffTimeout(uint16(attempt)))
time.Sleep(backoffTimeout(attempt))
continue
}
return nil, provider.ErrBadLightBlock{Reason: e}
@@ -188,7 +221,7 @@ func (p *http) validatorSet(ctx context.Context, height *uint64) (*types.Validat
func (p *http) signedHeader(ctx context.Context, height *uint64) (*types.SignedHeader, error) {
// create a for loop to control retries. If p.maxRetryAttempts
// is negative we will keep repeating.
for attempt := 0; attempt != p.maxRetryAttempts+1; attempt++ {
for attempt := uint16(0); attempt != p.maxRetryAttempts+1; attempt++ {
commit, err := p.client.Commit(ctx, height)
switch e := err.(type) {
case nil: // success!!
@@ -197,24 +230,20 @@ func (p *http) signedHeader(ctx context.Context, height *uint64) (*types.SignedH
case *url.Error:
if e.Timeout() {
// we wait and try again with exponential backoff
time.Sleep(backoffTimeout(uint16(attempt)))
time.Sleep(backoffTimeout(attempt))
continue
}
return nil, provider.ErrBadLightBlock{Reason: e}
case *rpctypes.RPCError:
// Check if we got something other than internal error. This shouldn't happen unless the RPC module
// or light client has been tampered with. If we do get this error, stop the connection with the
// peer and return an error
if e.Code != -32603 {
return nil, provider.ErrBadLightBlock{Reason: errors.New(e.Data)}
// check if the error indicates that the peer doesn't have the block
if strings.Contains(e.Data, ctypes.ErrHeightNotAvailable.Error()) ||
strings.Contains(e.Data, ctypes.ErrHeightExceedsChainHead.Error()) {
return nil, p.noBlock()
}
// check if the error indicates that the peer doesn't have the block
if strings.Contains(err.Error(), ctypes.ErrHeightNotAvailable.Error()) ||
strings.Contains(err.Error(), ctypes.ErrHeightExceedsChainHead.Error()) {
return nil, provider.ErrLightBlockNotFound
}
// for every other error, the provider returns a bad block
return nil, provider.ErrBadLightBlock{Reason: errors.New(e.Data)}
default:
// If we don't know the error then by default we return a bad light block error and
@@ -222,7 +251,27 @@ func (p *http) signedHeader(ctx context.Context, height *uint64) (*types.SignedH
return nil, provider.ErrBadLightBlock{Reason: e}
}
}
return nil, provider.ErrNoResponse
return nil, p.noResponse()
}
func (p *http) noResponse() error {
p.noResponseCount++
if p.noResponseCount > p.noResponseThreshold {
return provider.ErrUnreliableProvider{
Reason: fmt.Sprintf("failed to respond after %d attempts", p.noResponseCount),
}
}
return provider.ErrNoResponse
}
func (p *http) noBlock() error {
p.noBlockCount++
if p.noBlockCount > p.noBlockThreshold {
return provider.ErrUnreliableProvider{
Reason: fmt.Sprintf("failed to provide a block after %d attempts", p.noBlockCount),
}
}
return provider.ErrLightBlockNotFound
}
func validateHeight(height uint64) (*uint64, error) {

View File

@@ -66,25 +66,33 @@ func TestProvider(t *testing.T) {
require.NoError(t, err)
// let's get the highest block
sh, err := p.LightBlock(context.Background(), 0)
lb, err := p.LightBlock(context.Background(), 0)
require.NoError(t, err)
assert.True(t, sh.Height < 1000)
assert.True(t, lb.Height < 1000)
// let's check this is valid somehow
assert.Nil(t, sh.ValidateBasic(chainID))
assert.Nil(t, lb.ValidateBasic(chainID))
// historical queries now work :)
lower := sh.Height - 3
sh, err = p.LightBlock(context.Background(), lower)
lower := lb.Height - 3
lb, err = p.LightBlock(context.Background(), lower)
require.NoError(t, err)
assert.Equal(t, lower, sh.Height)
assert.Equal(t, lower, lb.Height)
// // fetching missing heights (both future and pruned) should return appropriate errors
_, err = p.LightBlock(context.Background(), 1000)
lb, err = p.LightBlock(context.Background(), 1000)
require.Error(t, err)
require.Nil(t, lb)
assert.Equal(t, provider.ErrLightBlockNotFound, err)
_, err = p.LightBlock(context.Background(), 1)
require.Error(t, err)
assert.Equal(t, provider.ErrLightBlockNotFound, err)
// if the provider is unable to provide four more blocks then we should return
// an unreliable peer error
for i := 0; i < 4; i++ {
_, err = p.LightBlock(context.Background(), 1)
}
assert.IsType(t, provider.ErrUnreliableProvider{}, err)
}

View File

@@ -2,15 +2,12 @@ package mock
import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/light/provider"
"github.com/tendermint/tendermint/types"
)
var errNoResp = errors.New("no response from provider")
type deadMock struct {
id string
}
@@ -25,9 +22,9 @@ func (p *deadMock) String() string {
}
func (p *deadMock) LightBlock(_ context.Context, height uint64) (*types.LightBlock, error) {
return nil, errNoResp
return nil, provider.ErrNoResponse
}
func (p *deadMock) ReportEvidence(_ context.Context, ev types.Evidence) error {
return errNoResp
return provider.ErrNoResponse
}

View File

@@ -8,7 +8,9 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/light"
lrpc "github.com/tendermint/tendermint/light/rpc"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
)
@@ -21,6 +23,28 @@ type Proxy struct {
Listener net.Listener
}
// NewProxy creates the struct used to run an HTTP server for serving light
// client rpc requests.
func NewProxy(
lightClient *light.Client,
listenAddr, providerAddr string,
config *rpcserver.Config,
logger log.Logger,
opts ...lrpc.Option,
) (*Proxy, error) {
rpcClient, err := rpchttp.NewWithTimeout(providerAddr, config.WriteTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create http client for %s: %w", providerAddr, err)
}
return &Proxy{
Addr: listenAddr,
Config: config,
Client: lrpc.NewClient(rpcClient, lightClient, opts...),
Logger: logger,
}, nil
}
// ListenAndServe configures the rpcserver.WebsocketManager, sets up the RPC
// routes to proxy via Client, and starts up an HTTP server on the TCP network
// address p.Addr.

View File

@@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"regexp"
"time"
"github.com/gogo/protobuf/proto"
@@ -59,6 +60,27 @@ func KeyPathFn(fn KeyPathFunc) Option {
}
}
// DefaultMerkleKeyPathFn creates a function used to generate merkle key paths
// from a path string and a key. This is the default used by the cosmos SDK.
// This merkle key paths are required when verifying /abci_query calls
func DefaultMerkleKeyPathFn() KeyPathFunc {
// regexp for extracting store name from /abci_query path
storeNameRegexp := regexp.MustCompile(`\/store\/(.+)\/key`)
return func(path string, key []byte) (merkle.KeyPath, error) {
matches := storeNameRegexp.FindStringSubmatch(path)
if len(matches) != 2 {
return nil, fmt.Errorf("can't find store name in %s using %s", path, storeNameRegexp)
}
storeName := matches[1]
kp := merkle.KeyPath{}
kp = kp.AppendKey([]byte(storeName), merkle.KeyEncodingURL)
kp = kp.AppendKey(key, merkle.KeyEncodingURL)
return kp, nil
}
}
// NewClient returns a new client.
func NewClient(next rpcclient.Client, lc LightClient, opts ...Option) *Client {
c := &Client{

View File

@@ -130,7 +130,7 @@ func VerifyAdjacent(
// Check the validator hashes are the same
if !bytes.Equal(untrustedHeader.ValidatorsHash, trustedHeader.NextValidatorsHash) {
err := fmt.Errorf("expected old header next validators (%X) to match those from new header (%X)",
err := fmt.Errorf("expected old header's next validators (%X) to match those from new header (%X)",
trustedHeader.NextValidatorsHash,
untrustedHeader.ValidatorsHash,
)

View File

@@ -277,13 +277,17 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
// so we only record the sender for txs still in the mempool.
if e, ok := mem.txsMap.Load(TxKey(tx)); ok {
memTx := e.(*clist.CElement).Value.(*mempoolTx)
memTx.senders.LoadOrStore(txInfo.SenderID, true)
_, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true)
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm.
if loaded {
return ErrTxInCache
}
}
return ErrTxInCache
mem.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash())
return nil
}
ctx := context.Background()
@@ -437,7 +441,7 @@ func (mem *CListMempool) resCbFirstTime(
}
memTx.senders.Store(peerID, true)
mem.addTx(memTx)
mem.logger.Info("Added good transaction",
mem.logger.Debug("added good transaction",
"tx", txID(tx),
"res", r,
"height", memTx.height,
@@ -446,7 +450,7 @@ func (mem *CListMempool) resCbFirstTime(
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
mem.logger.Info("Rejected bad transaction",
mem.logger.Debug("rejected bad transaction",
"tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr)
mem.metrics.FailedTxs.Add(1)
if !mem.config.KeepInvalidTxsInCache {
@@ -482,7 +486,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache)
}
@@ -493,7 +497,7 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
}
if mem.recheckCursor == nil {
// Done!
mem.logger.Info("Done rechecking txs")
mem.logger.Debug("done rechecking txs")
// incase the recheck removed all txs
if mem.Size() > 0 {
@@ -622,7 +626,7 @@ func (mem *CListMempool) Update(
// or just notify there're some txs left.
if mem.Size() > 0 {
if mem.config.Recheck {
mem.logger.Info("Recheck txs", "numtxs", mem.Size(), "height", height)
mem.logger.Debug("recheck txs", "numtxs", mem.Size(), "height", height)
mem.recheckTxs()
// At this point, mem.txs are being rechecked.
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.

View File

@@ -190,9 +190,7 @@ func TestMempoolUpdate(t *testing.T) {
err := mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
require.NoError(t, err)
err = mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
}
require.NoError(t, err)
}
// 2. Removes valid txs from the mempool
@@ -245,15 +243,11 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
// a must be added to the cache
err = mempool.CheckTx(a, nil, TxInfo{})
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
}
require.NoError(t, err)
// b must remain in the cache
err = mempool.CheckTx(b, nil, TxInfo{})
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
}
require.NoError(t, err)
}
// 2. An invalid transaction must remain in the cache
@@ -266,11 +260,6 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
err := mempool.CheckTx(a, nil, TxInfo{})
require.NoError(t, err)
err = mempool.CheckTx(a, nil, TxInfo{})
if assert.Error(t, err) {
assert.Equal(t, ErrTxInCache, err)
}
}
}

View File

@@ -180,7 +180,7 @@ func (r *Reactor) handleMempoolMessage(envelope p2p.Envelope) error {
for _, tx := range protoTxs {
if err := r.mempool.CheckTx(types.Tx(tx), nil, txInfo); err != nil {
logger.Error("checktx failed for tx", "tx", txID(tx), "err", err)
logger.Error("checktx failed for tx", "tx", fmt.Sprintf("%X", txID(tx)), "err", err)
}
}
@@ -376,7 +376,7 @@ func (r *Reactor) broadcastTxRoutine(peerID p2p.NodeID, closer *tmsync.Closer) {
Txs: [][]byte{memTx.tx},
},
}
r.Logger.Debug("gossiped tx to peer", "tx", txID(memTx.tx), "peer", peerID)
r.Logger.Debug("gossiped tx to peer", "tx", fmt.Sprintf("%X", txID(memTx.tx)), "peer", peerID)
}
select {

View File

@@ -91,7 +91,7 @@ func TestPEXReactorRunning(t *testing.T) {
sw.AddReactor("pex", r)
return sw
})
}, logger)
}
for _, sw := range switches {
@@ -406,7 +406,7 @@ func TestPEXReactorSeedModeFlushStop(t *testing.T) {
sw.AddReactor("pex", r)
return sw
})
}, logger)
}
for _, sw := range switches {
@@ -567,6 +567,7 @@ func testCreatePeerWithConfig(dir string, id int, config *ReactorConfig) *p2p.Sw
sw.AddReactor("pex", r)
return sw
},
log.TestingLogger(),
)
return peer
}
@@ -600,6 +601,7 @@ func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress)
sw.AddReactor("pex", r)
return sw
},
log.TestingLogger(),
)
return seed
}
@@ -629,8 +631,7 @@ func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
sw.AddReactor(r.String(), r)
}
return sw
})
sw.SetLogger(log.TestingLogger())
}, log.TestingLogger())
return sw
}

View File

@@ -110,7 +110,6 @@ func (rs *ReactorShim) proxyPeerEnvelopes() {
rs.Logger.Error(
"failed to proxy envelope; failed to wrap message",
"ch_id", cs.Descriptor.ID,
"msg", e.Message,
"err", err,
)
continue
@@ -124,7 +123,6 @@ func (rs *ReactorShim) proxyPeerEnvelopes() {
rs.Logger.Error(
"failed to proxy envelope; failed to encode message",
"ch_id", cs.Descriptor.ID,
"msg", e.Message,
"err", err,
)
continue
@@ -140,7 +138,6 @@ func (rs *ReactorShim) proxyPeerEnvelopes() {
rs.Logger.Debug(
"failed to proxy envelope; failed to find peer",
"ch_id", cs.Descriptor.ID,
"msg", e.Message,
"peer", e.To,
)
continue
@@ -150,13 +147,12 @@ func (rs *ReactorShim) proxyPeerEnvelopes() {
rs.Logger.Error(
"failed to proxy message to peer",
"ch_id", cs.Descriptor.ID,
"msg", e.Message,
"peer", e.To,
)
}
default:
rs.Logger.Error("failed to proxy envelope; missing peer ID", "ch_id", cs.Descriptor.ID, "msg", e.Message)
rs.Logger.Error("failed to proxy envelope; missing peer ID", "ch_id", cs.Descriptor.ID)
}
}
}(cs)
@@ -294,7 +290,7 @@ func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) {
msg.Reset()
if err := proto.Unmarshal(msgBytes, msg); err != nil {
rs.Logger.Error("error decoding message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
rs.Logger.Error("error decoding message", "peer", src, "ch_id", cID, "err", err)
rs.Switch.StopPeerForError(src, err)
return
}
@@ -302,7 +298,7 @@ func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) {
validator, ok := msg.(messageValidator)
if ok {
if err := validator.Validate(); err != nil {
rs.Logger.Error("invalid message", "peer", src, "ch_id", cID, "msg", msg, "err", err)
rs.Logger.Error("invalid message", "peer", src, "ch_id", cID, "err", err)
rs.Switch.StopPeerForError(src, err)
return
}
@@ -314,7 +310,7 @@ func (rs *ReactorShim) Receive(chID byte, src Peer, msgBytes []byte) {
msg, err = wrapper.Unwrap()
if err != nil {
rs.Logger.Error("failed to unwrap message", "peer", src, "ch_id", chID, "msg", msg, "err", err)
rs.Logger.Error("failed to unwrap message", "peer", src, "ch_id", chID, "err", err)
return
}
}

View File

@@ -61,7 +61,7 @@ func setup(t *testing.T, peers []p2p.Peer) *reactorShimTestSuite {
sw.AddReactor(rts.shim.Name, rts.shim)
return sw
})
}, log.TestingLogger())
// start the reactor shim
require.NoError(t, rts.shim.Start())

View File

@@ -186,7 +186,7 @@ func assertMsgReceivedWithTimeout(
}
func TestSwitchFiltersOutItself(t *testing.T) {
s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc, log.TestingLogger())
// simulate s1 having a public IP by creating a remote peer with the same ID
rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
@@ -225,6 +225,7 @@ func TestSwitchPeerFilter(t *testing.T) {
"testing",
"123.123.123",
initSwitchFunc,
log.TestingLogger(),
SwitchPeerFilters(filters...),
)
)
@@ -280,6 +281,7 @@ func TestSwitchPeerFilterTimeout(t *testing.T) {
"testing",
"123.123.123",
initSwitchFunc,
log.TestingLogger(),
SwitchFilterTimeout(5*time.Millisecond),
SwitchPeerFilters(filters...),
)
@@ -319,7 +321,7 @@ func TestSwitchPeerFilterTimeout(t *testing.T) {
}
func TestSwitchPeerFilterDuplicate(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
err := sw.Start()
require.NoError(t, err)
t.Cleanup(func() {
@@ -372,7 +374,7 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration)
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
err := sw.Start()
if err != nil {
t.Error(err)
@@ -471,7 +473,7 @@ func TestSwitchStopPeerForError(t *testing.T) {
}
func TestSwitchReconnectsToOutboundPersistentPeer(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
err := sw.Start()
require.NoError(t, err)
t.Cleanup(func() {
@@ -521,7 +523,7 @@ func TestSwitchReconnectsToOutboundPersistentPeer(t *testing.T) {
}
func TestSwitchReconnectsToInboundPersistentPeer(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
err := sw.Start()
require.NoError(t, err)
t.Cleanup(func() {
@@ -554,7 +556,7 @@ func TestSwitchDialPeersAsync(t *testing.T) {
return
}
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
err := sw.Start()
require.NoError(t, err)
t.Cleanup(func() {
@@ -620,7 +622,7 @@ func TestSwitchAcceptRoutine(t *testing.T) {
}
// make switch
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc, log.TestingLogger())
err := sw.AddUnconditionalPeerIDs(unconditionalPeerIDs)
require.NoError(t, err)
err = sw.Start()
@@ -785,7 +787,7 @@ func TestSwitchInitPeerIsNotCalledBeforeRemovePeer(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", func(i int, sw *Switch) *Switch {
sw.AddReactor("mock", reactor)
return sw
})
}, log.TestingLogger())
err := sw.Start()
require.NoError(t, err)
t.Cleanup(func() {

View File

@@ -71,7 +71,7 @@ func MakeConnectedSwitches(cfg *config.P2PConfig,
) []*Switch {
switches := make([]*Switch, n)
for i := 0; i < n; i++ {
switches[i] = MakeSwitch(cfg, i, TestHost, "123.123.123", initSwitch)
switches[i] = MakeSwitch(cfg, i, TestHost, "123.123.123", initSwitch, log.TestingLogger())
}
if err := StartSwitches(switches); err != nil {
@@ -163,6 +163,7 @@ func MakeSwitch(
i int,
network, version string,
initSwitch func(int, *Switch) *Switch,
logger log.Logger,
opts ...SwitchOption,
) *Switch {
@@ -175,13 +176,13 @@ func MakeSwitch(
panic(err)
}
logger := log.TestingLogger().With("switch", i)
t := NewMConnTransport(logger, MConnConfig(cfg),
swLogger := logger.With("switch", i)
t := NewMConnTransport(swLogger, MConnConfig(cfg),
[]*ChannelDescriptor{}, MConnTransportOptions{})
// TODO: let the config be passed in?
sw := initSwitch(i, NewSwitch(cfg, t, opts...))
sw.SetLogger(log.TestingLogger().With("switch", i))
sw.SetLogger(swLogger)
sw.SetNodeKey(nodeKey)
if err := t.Listen(addr.Endpoint()); err != nil {

View File

@@ -185,26 +185,26 @@ func GenFilePV(keyFilePath, stateFilePath, keyType string) (*FilePV, error) {
// LoadFilePV loads a FilePV from the filePaths. The FilePV handles double
// signing prevention by persisting data to the stateFilePath. If either file path
// does not exist, the program will exit.
func LoadFilePV(keyFilePath, stateFilePath string) *FilePV {
func LoadFilePV(keyFilePath, stateFilePath string) (*FilePV, error) {
return loadFilePV(keyFilePath, stateFilePath, true)
}
// LoadFilePVEmptyState loads a FilePV from the given keyFilePath, with an empty LastSignState.
// If the keyFilePath does not exist, the program will exit.
func LoadFilePVEmptyState(keyFilePath, stateFilePath string) *FilePV {
func LoadFilePVEmptyState(keyFilePath, stateFilePath string) (*FilePV, error) {
return loadFilePV(keyFilePath, stateFilePath, false)
}
// If loadState is true, we load from the stateFilePath. Otherwise, we use an empty LastSignState.
func loadFilePV(keyFilePath, stateFilePath string, loadState bool) *FilePV {
func loadFilePV(keyFilePath, stateFilePath string, loadState bool) (*FilePV, error) {
keyJSONBytes, err := ioutil.ReadFile(keyFilePath)
if err != nil {
tmos.Exit(err.Error())
return nil, err
}
pvKey := FilePVKey{}
err = tmjson.Unmarshal(keyJSONBytes, &pvKey)
if err != nil {
tmos.Exit(fmt.Sprintf("Error reading PrivValidator key from %v: %v\n", keyFilePath, err))
return nil, fmt.Errorf("error reading PrivValidator key from %v: %w", keyFilePath, err)
}
// overwrite pubkey and address for convenience
@@ -217,11 +217,11 @@ func loadFilePV(keyFilePath, stateFilePath string, loadState bool) *FilePV {
if loadState {
stateJSONBytes, err := ioutil.ReadFile(stateFilePath)
if err != nil {
tmos.Exit(err.Error())
return nil, err
}
err = tmjson.Unmarshal(stateJSONBytes, &pvState)
if err != nil {
tmos.Exit(fmt.Sprintf("Error reading PrivValidator state from %v: %v\n", stateFilePath, err))
return nil, fmt.Errorf("error reading PrivValidator state from %v: %w", stateFilePath, err)
}
}
@@ -230,7 +230,7 @@ func loadFilePV(keyFilePath, stateFilePath string, loadState bool) *FilePV {
return &FilePV{
Key: pvKey,
LastSignState: pvState,
}
}, nil
}
// LoadOrGenFilePV loads a FilePV from the given filePaths
@@ -241,7 +241,7 @@ func LoadOrGenFilePV(keyFilePath, stateFilePath string) (*FilePV, error) {
err error
)
if tmos.FileExists(keyFilePath) {
pv = LoadFilePV(keyFilePath, stateFilePath)
pv, err = LoadFilePV(keyFilePath, stateFilePath)
} else {
pv, err = GenFilePV(keyFilePath, stateFilePath, "")
pv.Save()

View File

@@ -36,7 +36,8 @@ func TestGenLoadValidator(t *testing.T) {
privVal.Save()
addr := privVal.GetAddress()
privVal = LoadFilePV(tempKeyFile.Name(), tempStateFile.Name())
privVal, err = LoadFilePV(tempKeyFile.Name(), tempStateFile.Name())
assert.NoError(err)
assert.Equal(addr, privVal.GetAddress(), "expected privval addr to be the same")
assert.Equal(height, privVal.LastSignState.Height, "expected privval.LastHeight to have been saved")
}

View File

@@ -208,7 +208,7 @@ message ResponseDeliverTx {
int64 gas_wanted = 5 [json_name = "gas_wanted"];
int64 gas_used = 6 [json_name = "gas_used"];
repeated Event events = 7
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"];
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"]; // nondeterministic
string codespace = 8;
}

View File

@@ -14,7 +14,7 @@ import (
func TestUnsafeDialSeeds(t *testing.T) {
sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123",
func(n int, sw *p2p.Switch) *p2p.Switch { return sw })
func(n int, sw *p2p.Switch) *p2p.Switch { return sw }, log.TestingLogger())
err := sw.Start()
require.NoError(t, err)
t.Cleanup(func() {
@@ -48,7 +48,7 @@ func TestUnsafeDialSeeds(t *testing.T) {
func TestUnsafeDialPeers(t *testing.T) {
sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123",
func(n int, sw *p2p.Switch) *p2p.Switch { return sw })
func(n int, sw *p2p.Switch) *p2p.Switch { return sw }, log.TestingLogger())
sw.SetAddrBook(&p2p.AddrBookMock{
Addrs: make(map[string]struct{}),
OurAddrs: make(map[string]struct{}),

View File

@@ -23,13 +23,12 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
return func(w http.ResponseWriter, r *http.Request) {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
WriteRPCResponseHTTPError(
w,
types.RPCInvalidRequestError(
nil,
fmt.Errorf("error reading request body: %w", err),
),
res := types.RPCInvalidRequestError(nil,
fmt.Errorf("error reading request body: %w", err),
)
if wErr := WriteRPCResponseHTTPError(w, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
return
}
@@ -49,12 +48,10 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
// next, try to unmarshal as a single request
var request types.RPCRequest
if err := json.Unmarshal(b, &request); err != nil {
WriteRPCResponseHTTPError(
w,
types.RPCParseError(
fmt.Errorf("error unmarshaling request: %w", err),
),
)
res := types.RPCParseError(fmt.Errorf("error unmarshaling request: %w", err))
if wErr := WriteRPCResponseHTTPError(w, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
return
}
requests = []types.RPCRequest{request}
@@ -97,8 +94,9 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
}
args = append(args, fnArgs...)
}
returns := rpcFunc.f.Call(args)
logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
logger.Debug("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns)
result, err := unreflectResult(returns)
switch e := err.(type) {
// if no error then return a success response
@@ -122,8 +120,11 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger log.Logger) http.Han
}
}
}
if len(responses) > 0 {
WriteRPCResponseHTTP(w, responses...)
if wErr := WriteRPCResponseHTTP(w, responses...); wErr != nil {
logger.Error("failed to write responses", "res", responses, "err", wErr)
}
}
}
}

View File

@@ -89,7 +89,8 @@ func ServeTLS(
return err
}
// WriteRPCResponseHTTPError marshals res as JSON and writes it to w.
// WriteRPCResponseHTTPError marshals res as JSON (with indent) and writes it
// to w.
//
// Maps JSON RPC error codes to HTTP Status codes as follows:
//
@@ -102,19 +103,17 @@ func ServeTLS(
// 500 -32099..-32000 Server error.
//
// source: https://www.jsonrpc.org/historical/json-rpc-over-http.html
//
// Panics if it can't Marshal res or write to w.
func WriteRPCResponseHTTPError(
w http.ResponseWriter,
res types.RPCResponse,
) {
) error {
if res.Error == nil {
panic("tried to write http error response without RPC error")
}
jsonBytes, err := json.MarshalIndent(res, "", " ")
if err != nil {
panic(err)
return fmt.Errorf("json marshal: %w", err)
}
var httpCode int
@@ -129,15 +128,12 @@ func WriteRPCResponseHTTPError(
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(httpCode)
if _, err := w.Write(jsonBytes); err != nil {
panic(err)
}
_, err = w.Write(jsonBytes)
return err
}
// WriteRPCResponseHTTP marshals res as JSON and writes it to w.
//
// Panics if it can't Marshal res or write to w.
func WriteRPCResponseHTTP(w http.ResponseWriter, res ...types.RPCResponse) {
// WriteRPCResponseHTTP marshals res as JSON (with indent) and writes it to w.
func WriteRPCResponseHTTP(w http.ResponseWriter, res ...types.RPCResponse) error {
var v interface{}
if len(res) == 1 {
v = res[0]
@@ -147,13 +143,12 @@ func WriteRPCResponseHTTP(w http.ResponseWriter, res ...types.RPCResponse) {
jsonBytes, err := json.MarshalIndent(v, "", " ")
if err != nil {
panic(err)
return fmt.Errorf("json marshal: %w", err)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200)
if _, err := w.Write(jsonBytes); err != nil {
panic(err)
}
_, err = w.Write(jsonBytes)
return err
}
//-----------------------------------------------------------------------------
@@ -191,7 +186,9 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler
// If RPCResponse
if res, ok := e.(types.RPCResponse); ok {
WriteRPCResponseHTTP(rww, res)
if wErr := WriteRPCResponseHTTP(rww, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
} else {
// Panics can contain anything, attempt to normalize it as an error.
var err error
@@ -205,14 +202,12 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler
default:
}
logger.Error(
"Panic in RPC HTTP handler", "err", e, "stack",
string(debug.Stack()),
)
WriteRPCResponseHTTPError(
rww,
types.RPCInternalError(types.JSONRPCIntID(-1), err),
)
logger.Error("panic in RPC HTTP handler", "err", e, "stack", string(debug.Stack()))
res := types.RPCInternalError(types.JSONRPCIntID(-1), err)
if wErr := WriteRPCResponseHTTPError(rww, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
}
}
@@ -221,9 +216,11 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler
if rww.Status == -1 {
rww.Status = 200
}
logger.Info("Served RPC HTTP response",
"method", r.Method, "url", r.URL,
"status", rww.Status, "duration", durationMS,
logger.Debug("served RPC HTTP response",
"method", r.Method,
"url", r.URL,
"status", rww.Status,
"duration", durationMS,
"remoteAddr", r.RemoteAddr,
)
}()

View File

@@ -112,7 +112,8 @@ func TestWriteRPCResponseHTTP(t *testing.T) {
// one argument
w := httptest.NewRecorder()
WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(id, &sampleResult{"hello"}))
err := WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(id, &sampleResult{"hello"}))
require.NoError(t, err)
resp := w.Result()
body, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
@@ -129,9 +130,10 @@ func TestWriteRPCResponseHTTP(t *testing.T) {
// multiple arguments
w = httptest.NewRecorder()
WriteRPCResponseHTTP(w,
err = WriteRPCResponseHTTP(w,
types.NewRPCSuccessResponse(id, &sampleResult{"hello"}),
types.NewRPCSuccessResponse(id, &sampleResult{"world"}))
require.NoError(t, err)
resp = w.Result()
body, err = ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
@@ -159,7 +161,8 @@ func TestWriteRPCResponseHTTP(t *testing.T) {
func TestWriteRPCResponseHTTPError(t *testing.T) {
w := httptest.NewRecorder()
WriteRPCResponseHTTPError(w, types.RPCInternalError(types.JSONRPCIntID(-1), errors.New("foo")))
err := WriteRPCResponseHTTPError(w, types.RPCInternalError(types.JSONRPCIntID(-1), errors.New("foo")))
require.NoError(t, err)
resp := w.Result()
body, err := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()

View File

@@ -27,7 +27,10 @@ func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWrit
// Exception for websocket endpoints
if rpcFunc.ws {
return func(w http.ResponseWriter, r *http.Request) {
WriteRPCResponseHTTPError(w, types.RPCMethodNotFoundError(dummyID))
res := types.RPCMethodNotFoundError(dummyID)
if wErr := WriteRPCResponseHTTPError(w, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
}
}
@@ -40,13 +43,12 @@ func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWrit
fnArgs, err := httpParamsToArgs(rpcFunc, r)
if err != nil {
WriteRPCResponseHTTPError(
w,
types.RPCInvalidParamsError(
dummyID,
fmt.Errorf("error converting http params to arguments: %w", err),
),
res := types.RPCInvalidParamsError(dummyID,
fmt.Errorf("error converting http params to arguments: %w", err),
)
if wErr := WriteRPCResponseHTTPError(w, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
return
}
args = append(args, fnArgs...)
@@ -58,21 +60,33 @@ func makeHTTPHandler(rpcFunc *RPCFunc, logger log.Logger) func(http.ResponseWrit
switch e := err.(type) {
// if no error then return a success response
case nil:
WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(dummyID, result))
res := types.NewRPCSuccessResponse(dummyID, result)
if wErr := WriteRPCResponseHTTP(w, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
// if this already of type RPC error then forward that error.
case *types.RPCError:
WriteRPCResponseHTTPError(w, types.NewRPCErrorResponse(dummyID, e.Code, e.Message, e.Data))
res := types.NewRPCErrorResponse(dummyID, e.Code, e.Message, e.Data)
if wErr := WriteRPCResponseHTTPError(w, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
default: // we need to unwrap the error and parse it accordingly
var res types.RPCResponse
switch errors.Unwrap(err) {
case ctypes.ErrZeroOrNegativeHeight, ctypes.ErrZeroOrNegativePerPage,
ctypes.ErrPageOutOfRange, ctypes.ErrInvalidRequest:
WriteRPCResponseHTTPError(w, types.RPCInvalidRequestError(dummyID, err))
case ctypes.ErrZeroOrNegativeHeight,
ctypes.ErrZeroOrNegativePerPage,
ctypes.ErrPageOutOfRange,
ctypes.ErrInvalidRequest:
res = types.RPCInvalidRequestError(dummyID, err)
default: // ctypes.ErrHeightNotAvailable, ctypes.ErrHeightExceedsChainHead:
WriteRPCResponseHTTPError(w, types.RPCInternalError(dummyID, err))
res = types.RPCInternalError(dummyID, err)
}
if wErr := WriteRPCResponseHTTPError(w, res); wErr != nil {
logger.Error("failed to write response", "res", res, "err", wErr)
}
}

View File

@@ -57,27 +57,31 @@ type RPCRequest struct {
// UnmarshalJSON custom JSON unmarshaling due to jsonrpcid being string or int
func (req *RPCRequest) UnmarshalJSON(data []byte) error {
unsafeReq := &struct {
unsafeReq := struct {
JSONRPC string `json:"jsonrpc"`
ID interface{} `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{}
}{}
err := json.Unmarshal(data, &unsafeReq)
if err != nil {
return err
}
if unsafeReq.ID == nil { // notification
return nil
}
req.JSONRPC = unsafeReq.JSONRPC
req.Method = unsafeReq.Method
req.Params = unsafeReq.Params
if unsafeReq.ID == nil {
return nil
}
id, err := idFromInterface(unsafeReq.ID)
if err != nil {
return err
}
req.ID = id
return nil
}

View File

@@ -167,7 +167,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
return state, 0, err
}
if len(validatorUpdates) > 0 {
blockExec.logger.Info("updates to validators", "updates", types.ValidatorListString(validatorUpdates))
blockExec.logger.Debug("updates to validators", "updates", types.ValidatorListString(validatorUpdates))
}
// Update the state with the block and responses.

View File

@@ -62,7 +62,7 @@ func VersionFromProto(v tmstate.Version) Version {
// including the last validator set and the consensus params.
// All fields are exposed so the struct can be easily serialized,
// but none of them should be mutated directly.
// Instead, use state.Copy() or state.NextState(...).
// Instead, use state.Copy() or updateState(...).
// NOTE: not goroutine-safe.
type State struct {
Version Version

View File

@@ -67,7 +67,7 @@ func (is *IndexerService) OnStart() error {
if err = is.idr.AddBatch(batch); err != nil {
is.Logger.Error("Failed to index block", "height", height, "err", err)
} else {
is.Logger.Info("Indexed block", "height", height)
is.Logger.Debug("Indexed block", "height", height)
}
}
}()

View File

@@ -477,13 +477,18 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
}
// Save seen commit (seen +2/3 precommits for block)
// NOTE: we can delete this at a later height
pbsc := seenCommit.ToProto()
seenCommitBytes := mustEncode(pbsc)
if err := batch.Set(seenCommitKey(height), seenCommitBytes); err != nil {
panic(err)
}
// remove the previous seen commit that we have just replaced with the
// canonical commit
if err := batch.Delete(seenCommitKey(height - 1)); err != nil {
panic(err)
}
if err := batch.WriteSync(); err != nil {
panic(err)
}

View File

@@ -35,8 +35,14 @@ func makeTestCommit(height uint64, timestamp time.Time) *types.Commit {
Timestamp: timestamp,
Signature: []byte("Signature"),
}}
return types.NewCommit(height, 0,
types.BlockID{Hash: []byte(""), PartSetHeader: types.PartSetHeader{Hash: []byte(""), Total: 2}}, commitSigs)
return types.NewCommit(
height,
0,
types.BlockID{
Hash: crypto.CRandBytes(32),
PartSetHeader: types.PartSetHeader{Hash: crypto.CRandBytes(32), Total: 2},
},
commitSigs)
}
func makeTxs(height uint64) (txs []types.Tx) {
@@ -504,6 +510,45 @@ func TestBlockFetchAtHeight(t *testing.T) {
require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2")
}
func TestSeenAndCanonicalCommit(t *testing.T) {
bs, _ := freshBlockStore()
height := uint64(2)
loadCommit := func() (interface{}, error) {
meta := bs.LoadSeenCommit(height)
return meta, nil
}
// Initially no contents.
// 1. Requesting for a non-existent blockMeta shouldn't fail
res, _, panicErr := doFn(loadCommit)
require.Nil(t, panicErr, "a non-existent blockMeta shouldn't cause a panic")
require.Nil(t, res, "a non-existent blockMeta should return nil")
// produce a few blocks and check that the correct seen and cannoncial commits
// are persisted.
for h := uint64(3); h <= 5; h++ {
c1 := bs.LoadSeenCommit(h)
require.Nil(t, c1)
c2 := bs.LoadBlockCommit(h - 1)
require.Nil(t, c2)
blockCommit := makeTestCommit(h-1, tmtime.Now())
block := makeBlock(h, state, blockCommit)
partSet := block.MakePartSet(2)
seenCommit := makeTestCommit(h, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
c3 := bs.LoadSeenCommit(h)
require.Equal(t, seenCommit.Hash(), c3.Hash())
// the previous seen commit should be removed
c4 := bs.LoadSeenCommit(h - 1)
require.Nil(t, c4)
c5 := bs.LoadBlockCommit(h)
require.Nil(t, c5)
c6 := bs.LoadBlockCommit(h - 1)
require.Equal(t, blockCommit.Hash(), c6.Hash())
}
}
func doFn(fn func() (interface{}, error)) (res interface{}, err error, panicErr error) {
defer func() {
if r := recover(); r != nil {

View File

@@ -109,14 +109,7 @@ if [[ $APPEND_TX_CODE != 0 ]]; then
exit 1
fi
echo "... sending tx. expect error"
# second time should get rejected by the mempool (return error and non-zero code)
sendTx $TX true
echo "... sending tx. expect no error"
echo "... sending new tx. expect no error"
# now, TX=01 should pass, with no error
TX=01

View File

@@ -70,6 +70,8 @@ The test runner has the following stages, which can also be executed explicitly
* `cleanup`: removes configuration files and Docker containers/networks.
Auxiliary commands:
* `logs`: outputs all node logs.
* `tail`: tails (follows) node logs until canceled.
@@ -139,3 +141,11 @@ Docker does not enable IPv6 by default. To do so, enter the following in
"fixed-cidr-v6": "2001:db8:1::/64"
}
```
## Benchmarking testnets
It is also possible to run a simple benchmark on a testnet. This is done through the `benchmark` command. This manages the entire process: setting up the environment, starting the test net, waiting for a considerable amount of blocks to be used (currently 100), and then returning the following metrics from the sample of the blockchain:
- Average time to produce a block
- Standard deviation of producing a block
- Minimum and maximum time to produce a block

View File

@@ -14,6 +14,7 @@ type Config struct {
Listen string
Protocol string
Dir string
Mode string `toml:"mode"`
PersistInterval uint64 `toml:"persist_interval"`
SnapshotInterval uint64 `toml:"snapshot_interval"`
RetainBlocks uint64 `toml:"retain_blocks"`

View File

@@ -1,11 +1,14 @@
package main
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/spf13/viper"
@@ -17,12 +20,18 @@ import (
tmflags "github.com/tendermint/tendermint/libs/cli/flags"
"github.com/tendermint/tendermint/libs/log"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/light"
lproxy "github.com/tendermint/tendermint/light/proxy"
lrpc "github.com/tendermint/tendermint/light/rpc"
dbs "github.com/tendermint/tendermint/light/store/db"
"github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/privval"
grpcprivval "github.com/tendermint/tendermint/privval/grpc"
privvalproto "github.com/tendermint/tendermint/proto/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
var logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
@@ -66,10 +75,13 @@ func run(configFile string) error {
case "socket", "grpc":
err = startApp(cfg)
case "builtin":
if cfg.Mode == string(e2e.ModeLight) {
err = startLightClient(cfg)
} else {
err = startNode(cfg)
}
// FIXME: Temporarily remove maverick until it is redesigned
// if len(cfg.Misbehaviors) == 0 {
err = startNode(cfg)
// } else {
// err = startMaverick(cfg)
// }
default:
@@ -137,6 +149,63 @@ func startNode(cfg *Config) error {
return n.Start()
}
func startLightClient(cfg *Config) error {
tmcfg, nodeLogger, _, err := setupNode()
if err != nil {
return err
}
dbContext := &node.DBContext{ID: "light", Config: tmcfg}
lightDB, err := node.DefaultDBProvider(dbContext)
if err != nil {
return err
}
providers := rpcEndpoints(tmcfg.P2P.PersistentPeers)
c, err := light.NewHTTPClient(
context.Background(),
cfg.ChainID,
light.TrustOptions{
Period: tmcfg.StateSync.TrustPeriod,
Height: tmcfg.StateSync.TrustHeight,
Hash: tmcfg.StateSync.TrustHashBytes(),
},
providers[0],
providers[1:],
dbs.New(lightDB),
light.Logger(nodeLogger),
)
if err != nil {
return err
}
rpccfg := rpcserver.DefaultConfig()
rpccfg.MaxBodyBytes = tmcfg.RPC.MaxBodyBytes
rpccfg.MaxHeaderBytes = tmcfg.RPC.MaxHeaderBytes
rpccfg.MaxOpenConnections = tmcfg.RPC.MaxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if rpccfg.WriteTimeout <= tmcfg.RPC.TimeoutBroadcastTxCommit {
rpccfg.WriteTimeout = tmcfg.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
p, err := lproxy.NewProxy(c, tmcfg.RPC.ListenAddress, providers[0], rpccfg, nodeLogger,
lrpc.KeyPathFn(lrpc.DefaultMerkleKeyPathFn()))
if err != nil {
return err
}
logger.Info("Starting proxy...", "laddr", tmcfg.RPC.ListenAddress)
if err := p.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
logger.Error("proxy ListenAndServe", "err", err)
}
return nil
}
// FIXME: Temporarily disconnected maverick until it is redesigned
// startMaverick starts a Maverick node that runs the application directly. It assumes the Tendermint
// configuration is in $TMHOME/config/tendermint.toml.
@@ -176,7 +245,10 @@ func startNode(cfg *Config) error {
// startSigner starts a signer server connecting to the given endpoint.
func startSigner(cfg *Config) error {
filePV := privval.LoadFilePV(cfg.PrivValKey, cfg.PrivValState)
filePV, err := privval.LoadFilePV(cfg.PrivValKey, cfg.PrivValState)
if err != nil {
return err
}
protocol, address := tmnet.ProtocolAndAddress(cfg.PrivValServer)
var dialFn privval.SocketDialer
@@ -210,7 +282,7 @@ func startSigner(cfg *Config) error {
endpoint := privval.NewSignerDialerEndpoint(logger, dialFn,
privval.SignerDialerEndpointRetryWaitInterval(1*time.Second),
privval.SignerDialerEndpointConnRetries(100))
err := privval.NewSignerServer(endpoint, cfg.ChainID, filePV).Start()
err = privval.NewSignerServer(endpoint, cfg.ChainID, filePV).Start()
if err != nil {
return err
}
@@ -264,3 +336,21 @@ func setupNode() (*config.Config, log.Logger, *p2p.NodeKey, error) {
return tmcfg, nodeLogger, &nodeKey, nil
}
// rpcEndpoints takes a list of persistent peers and splits them into a list of rpc endpoints
// using 26657 as the port number
func rpcEndpoints(peers string) []string {
arr := strings.Split(peers, ",")
endpoints := make([]string, len(arr))
for i, v := range arr {
addr, err := p2p.ParseNodeAddress(v)
if err != nil {
panic(err)
}
// use RPC port instead
addr.Port = 26657
rpcEndpoint := "http://" + addr.Hostname + ":" + fmt.Sprint(addr.Port)
endpoints[i] = rpcEndpoint
}
return endpoints
}

View File

@@ -77,7 +77,7 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
KeyType: opt["keyType"].(string),
}
var numSeeds, numValidators, numFulls int
var numSeeds, numValidators, numFulls, numLightClients int
switch opt["topology"].(string) {
case "single":
numValidators = 1
@@ -85,7 +85,8 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
numValidators = 4
case "large":
// FIXME Networks are kept small since large ones use too much CPU.
numSeeds = r.Intn(4)
numSeeds = r.Intn(3)
numLightClients = r.Intn(3)
numValidators = 4 + r.Intn(7)
numFulls = r.Intn(5)
default:
@@ -143,6 +144,13 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
}
for i := 1; i <= numLightClients; i++ {
startAt := manifest.InitialHeight + 5
manifest.Nodes[fmt.Sprintf("light%02d", i)] = generateNode(
r, e2e.ModeLight, startAt+(5*uint64(i)), manifest.InitialHeight, false,
)
}
// We now set up peer discovery for nodes. Seed nodes are fully meshed with
// each other, while non-seed nodes either use a set of random seeds or a
// set of random peers that start before themselves.
@@ -175,6 +183,10 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
}
})
for i, name := range peerNames {
// we skip over light clients - they connect to all peers initially
if manifest.Nodes[name].Mode == string(e2e.ModeLight) {
continue
}
if len(seedNames) > 0 && (i == 0 || r.Float64() >= 0.5) {
manifest.Nodes[name].Seeds = uniformSetChoice(seedNames).Choose(r)
} else if i > 0 {
@@ -213,7 +225,7 @@ func generateNode(
node.SnapshotInterval = 3
}
if node.Mode == "validator" {
if node.Mode == string(e2e.ModeValidator) {
misbehaveAt := startAt + 5 + uint64(r.Intn(10))
if startAt == 0 {
misbehaveAt += initialHeight - 1
@@ -224,6 +236,11 @@ func generateNode(
}
}
if node.Mode == string(e2e.ModeLight) {
node.ABCIProtocol = "builtin"
node.StateSync = false
}
// If a node which does not persist state also does not retain blocks, randomly
// choose to either persist state or retain all blocks.
if node.PersistInterval != nil && *node.PersistInterval == 0 && node.RetainBlocks > 0 {

View File

@@ -90,3 +90,13 @@ fast_sync = "v0"
state_sync = true
seeds = ["seed01"]
perturb = ["restart"]
[node.light01]
mode= "light"
start_at= 1005
persistent_peers = ["validator01", "validator02", "validator03"]
[node.light02]
mode= "light"
start_at= 1015
persistent_peers = ["validator04", "full01", "validator05"]

View File

@@ -2,4 +2,4 @@
[node.validator02]
[node.validator03]
[node.validator04]

View File

@@ -58,9 +58,9 @@ type Manifest struct {
// ManifestNode represents a node in a testnet manifest.
type ManifestNode struct {
// Mode specifies the type of node: "validator", "full", or "seed". Defaults to
// "validator". Full nodes do not get a signing key (a dummy key is generated),
// and seed nodes run in seed mode with the PEX reactor enabled.
// Mode specifies the type of node: "validator", "full", "light" or "seed".
// Defaults to "validator". Full nodes do not get a signing key (a dummy key
// is generated), and seed nodes run in seed mode with the PEX reactor enabled.
Mode string `toml:"mode"`
// Seeds is the list of node names to use as P2P seed nodes. Defaults to none.
@@ -68,7 +68,8 @@ type ManifestNode struct {
// PersistentPeers is a list of node names to maintain persistent P2P
// connections to. If neither seeds nor persistent peers are specified,
// this defaults to all other nodes in the network.
// this defaults to all other nodes in the network. For light clients,
// this relates to the providers the light client is connected to.
PersistentPeers []string `toml:"persistent_peers"`
// Database specifies the database backend: "goleveldb", "cleveldb",

View File

@@ -33,6 +33,7 @@ type Perturbation string
const (
ModeValidator Mode = "validator"
ModeFull Mode = "full"
ModeLight Mode = "light"
ModeSeed Mode = "seed"
ProtocolBuiltin Protocol = "builtin"
@@ -151,7 +152,7 @@ func LoadTestnet(file string) (*Testnet, error) {
ProxyPort: proxyPortGen.Next(),
Mode: ModeValidator,
Database: "goleveldb",
ABCIProtocol: ProtocolUNIX,
ABCIProtocol: ProtocolBuiltin,
PrivvalProtocol: ProtocolFile,
StartAt: nodeManifest.StartAt,
FastSync: nodeManifest.FastSync,
@@ -327,6 +328,9 @@ func (n Node) Validate(testnet Testnet) error {
default:
return fmt.Errorf("invalid ABCI protocol setting %q", n.ABCIProtocol)
}
if n.Mode == ModeLight && n.ABCIProtocol != ProtocolBuiltin {
return errors.New("light client must use builtin protocol")
}
switch n.PrivvalProtocol {
case ProtocolFile, ProtocolTCP, ProtocolGRPC, ProtocolUNIX:
default:
@@ -402,7 +406,7 @@ func (t Testnet) LookupNode(name string) *Node {
func (t Testnet) ArchiveNodes() []*Node {
nodes := []*Node{}
for _, node := range t.Nodes {
if node.Mode != ModeSeed && node.StartAt == 0 && node.RetainBlocks == 0 {
if !node.Stateless() && node.StartAt == 0 && node.RetainBlocks == 0 {
nodes = append(nodes, node)
}
}
@@ -476,6 +480,11 @@ func (n Node) Client() (*rpchttp.HTTP, error) {
return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort))
}
// Stateless returns true if the node is either a seed node or a light node
func (n Node) Stateless() bool {
return n.Mode == ModeLight || n.Mode == ModeSeed
}
// keyGenerator generates pseudorandom Ed25519 keys based on a seed.
type keyGenerator struct {
random *rand.Rand

View File

@@ -0,0 +1,192 @@
package main
import (
"context"
"fmt"
"math"
"time"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/types"
)
// Benchmark is a simple function for fetching, calculating and printing
// the following metrics:
// 1. Average block production time
// 2. Block interval standard deviation
// 3. Max block interval (slowest block)
// 4. Min block interval (fastest block)
//
// Metrics are based of the `benchmarkLength`, the amount of consecutive blocks
// sampled from in the testnet
func Benchmark(testnet *e2e.Testnet, benchmarkLength uint64) error {
block, _, err := waitForHeight(testnet, 0)
if err != nil {
return err
}
logger.Info("Beginning benchmark period...", "height", block.Height)
// wait for the length of the benchmark period in blocks to pass. We allow 5 seconds for each block
// which should be sufficient.
waitingTime := time.Duration(benchmarkLength*5) * time.Second
endHeight, err := waitForAllNodes(testnet, block.Height+benchmarkLength, waitingTime)
if err != nil {
return err
}
logger.Info("Ending benchmark period", "height", endHeight)
// fetch a sample of blocks
blocks, err := fetchBlockChainSample(testnet, benchmarkLength)
if err != nil {
return err
}
// slice into time intervals and collate data
timeIntervals := splitIntoBlockIntervals(blocks)
testnetStats := extractTestnetStats(timeIntervals)
testnetStats.startHeight = blocks[0].Header.Height
testnetStats.endHeight = blocks[len(blocks)-1].Header.Height
// print and return
logger.Info(testnetStats.String())
return nil
}
type testnetStats struct {
startHeight uint64
endHeight uint64
// average time to produce a block
mean time.Duration
// standard deviation of block production
std float64
// longest time to produce a block
max time.Duration
// shortest time to produce a block
min time.Duration
}
func (t *testnetStats) String() string {
return fmt.Sprintf(`Benchmarked from height %v to %v
Mean Block Interval: %v
Standard Deviation: %f
Max Block Interval: %v
Min Block Interval: %v
`,
t.startHeight,
t.endHeight,
t.mean,
t.std,
t.max,
t.min,
)
}
// fetchBlockChainSample waits for `benchmarkLength` amount of blocks to pass, fetching
// all of the headers for these blocks from an archive node and returning it.
func fetchBlockChainSample(testnet *e2e.Testnet, benchmarkLength uint64) ([]*types.BlockMeta, error) {
var blocks []*types.BlockMeta
// Find the first archive node
archiveNode := testnet.ArchiveNodes()[0]
c, err := archiveNode.Client()
if err != nil {
return nil, err
}
// find the latest height
ctx := context.Background()
s, err := c.Status(ctx)
if err != nil {
return nil, err
}
to := s.SyncInfo.LatestBlockHeight
from := to - benchmarkLength + 1
if from <= testnet.InitialHeight {
return nil, fmt.Errorf("tesnet was unable to reach required height for benchmarking (latest height %d)", to)
}
// Fetch blocks
for from < to {
// fetch the blockchain metas. Currently we can only fetch 20 at a time
resp, err := c.BlockchainInfo(ctx, from, min(from+19, to))
if err != nil {
return nil, err
}
blockMetas := resp.BlockMetas
// we receive blocks in descending order so we have to add them in reverse
for i := len(blockMetas) - 1; i >= 0; i-- {
if blockMetas[i].Header.Height != from {
return nil, fmt.Errorf("node gave us another header. Wanted %d, got %d",
from,
blockMetas[i].Header.Height,
)
}
from++
blocks = append(blocks, blockMetas[i])
}
}
return blocks, nil
}
func splitIntoBlockIntervals(blocks []*types.BlockMeta) []time.Duration {
intervals := make([]time.Duration, len(blocks)-1)
lastTime := blocks[0].Header.Time
for i, block := range blocks {
// skip the first block
if i == 0 {
continue
}
intervals[i-1] = block.Header.Time.Sub(lastTime)
lastTime = block.Header.Time
}
return intervals
}
func extractTestnetStats(intervals []time.Duration) testnetStats {
var (
sum, mean time.Duration
std float64
max = intervals[0]
min = intervals[0]
)
for _, interval := range intervals {
sum += interval
if interval > max {
max = interval
}
if interval < min {
min = interval
}
}
mean = sum / time.Duration(len(intervals))
for _, interval := range intervals {
diff := (interval - mean).Seconds()
std += math.Pow(diff, 2)
}
std = math.Sqrt(std / float64(len(intervals)))
return testnetStats{
mean: mean,
std: std,
max: max,
min: min,
}
}
func min(a, b uint64) uint64 {
if a > b {
return b
}
return a
}

View File

@@ -218,6 +218,63 @@ func NewCLI() *CLI {
},
})
cli.root.AddCommand(&cobra.Command{
Use: "benchmark",
Short: "Benchmarks testnet",
Long: `Benchmarks the following metrics:
Mean Block Interval
Standard Deviation
Min Block Interval
Max Block Interval
over a 100 block sampling period.
Does not run any perbutations.
`,
RunE: func(cmd *cobra.Command, args []string) error {
if err := Cleanup(cli.testnet); err != nil {
return err
}
if err := Setup(cli.testnet); err != nil {
return err
}
chLoadResult := make(chan error)
ctx, loadCancel := context.WithCancel(context.Background())
defer loadCancel()
go func() {
err := Load(ctx, cli.testnet)
if err != nil {
logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error()))
}
chLoadResult <- err
}()
if err := Start(cli.testnet); err != nil {
return err
}
if err := Wait(cli.testnet, 5); err != nil { // allow some txs to go through
return err
}
// we benchmark performance over the next 100 blocks
if err := Benchmark(cli.testnet, 100); err != nil {
return err
}
loadCancel()
if err := <-chLoadResult; err != nil {
return err
}
if err := Cleanup(cli.testnet); err != nil {
return err
}
return nil
},
})
return cli
}

View File

@@ -86,24 +86,27 @@ func waitForNode(node *e2e.Node, height uint64, timeout time.Duration) (*rpctype
return status, nil
}
time.Sleep(200 * time.Millisecond)
time.Sleep(300 * time.Millisecond)
}
}
// waitForAllNodes waits for all nodes to become available and catch up to the given block height.
func waitForAllNodes(testnet *e2e.Testnet, height uint64, timeout time.Duration) (uint64, error) {
lastHeight := uint64(0)
var lastHeight uint64
for _, node := range testnet.Nodes {
if node.Mode == e2e.ModeSeed {
continue
}
status, err := waitForNode(node, height, 20*time.Second)
status, err := waitForNode(node, height, timeout)
if err != nil {
return 0, err
}
if status.SyncInfo.LatestBlockHeight > lastHeight {
lastHeight = status.SyncInfo.LatestBlockHeight
}
}
return lastHeight, nil
}

View File

@@ -65,23 +65,23 @@ func Setup(testnet *e2e.Testnet) error {
for _, node := range testnet.Nodes {
nodeDir := filepath.Join(testnet.Dir, node.Name)
dirs := []string{
filepath.Join(nodeDir, "config"),
filepath.Join(nodeDir, "data"),
filepath.Join(nodeDir, "data", "app"),
}
for _, dir := range dirs {
// light clients don't need an app directory
if node.Mode == e2e.ModeLight && strings.Contains(dir, "app") {
continue
}
err := os.MkdirAll(dir, 0755)
if err != nil {
return err
}
}
err = genesis.SaveAs(filepath.Join(nodeDir, "config", "genesis.json"))
if err != nil {
return err
}
cfg, err := MakeConfig(node)
if err != nil {
return err
@@ -97,6 +97,16 @@ func Setup(testnet *e2e.Testnet) error {
return err
}
if node.Mode == e2e.ModeLight {
// stop early if a light client
continue
}
err = genesis.SaveAs(filepath.Join(nodeDir, "config", "genesis.json"))
if err != nil {
return err
}
err = (&p2p.NodeKey{PrivKey: node.NodeKey}).SaveAs(filepath.Join(nodeDir, "config", "node_key.json"))
if err != nil {
return err
@@ -168,8 +178,7 @@ services:
image: tendermint/e2e-node
{{- if eq .ABCIProtocol "builtin" }}
entrypoint: /usr/bin/entrypoint-builtin
{{- end }}
{{- if ne .ABCIProtocol "builtin"}}
{{- else }}
command: {{ startCommands .Misbehaviors .LogLevel }}
{{- end }}
init: true
@@ -289,7 +298,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
case e2e.ModeSeed:
cfg.P2P.SeedMode = true
cfg.P2P.PexReactor = true
case e2e.ModeFull:
case e2e.ModeFull, e2e.ModeLight:
// Don't need to do anything, since we're using a dummy privval key by default.
default:
return nil, fmt.Errorf("unexpected mode %q", node.Mode)
@@ -338,6 +347,8 @@ func MakeAppConfig(node *e2e.Node) ([]byte, error) {
"chain_id": node.Testnet.Name,
"dir": "data/app",
"listen": AppAddressUNIX,
"mode": node.Mode,
"proxy_port": node.ProxyPort,
"protocol": "socket",
"persist_interval": node.PersistInterval,
"snapshot_interval": node.SnapshotInterval,

View File

@@ -58,7 +58,7 @@ func Start(testnet *e2e.Testnet) error {
// Update any state sync nodes with a trusted height and hash
for _, node := range nodeQueue {
if node.StateSync {
if node.StateSync || node.Mode == e2e.ModeLight {
err = UpdateConfigStateSync(node, block.Height, blockID.Hash.Bytes())
if err != nil {
return err

View File

@@ -20,9 +20,15 @@ func Wait(testnet *e2e.Testnet, blocks uint64) error {
// WaitUntil waits until a given height has been reached.
func WaitUntil(testnet *e2e.Testnet, height uint64) error {
logger.Info(fmt.Sprintf("Waiting for all nodes to reach height %v...", height))
_, err := waitForAllNodes(testnet, height, 20*time.Second)
_, err := waitForAllNodes(testnet, height, waitingTime(len(testnet.Nodes)))
if err != nil {
return err
}
return nil
}
// waitingTime estimates how long it should take for a node to reach the height.
// More nodes in a network implies we may expect a slower network and may have to wait longer.
func waitingTime(nodes int) time.Duration {
return time.Duration(20+(nodes*4)) * time.Second
}

View File

@@ -16,7 +16,7 @@ import (
// Tests that any initial state given in genesis has made it into the app.
func TestApp_InitialState(t *testing.T) {
testNode(t, func(t *testing.T, node e2e.Node) {
if node.Mode == e2e.ModeSeed {
if node.Stateless() {
return
}
if len(node.Testnet.InitialState) == 0 {
@@ -81,12 +81,27 @@ func TestApp_Tx(t *testing.T) {
value := fmt.Sprintf("%x", bz)
tx := types.Tx(fmt.Sprintf("%v=%v", key, value))
_, err = client.BroadcastTxCommit(ctx, tx)
resp, err := client.BroadcastTxCommit(ctx, tx)
require.NoError(t, err)
resp, err := client.ABCIQuery(ctx, "", []byte(key))
// wait for the tx to be persisted in the tx indexer
time.Sleep(500 * time.Millisecond)
hash := tx.Hash()
txResp, err := client.Tx(ctx, hash, false)
require.NoError(t, err)
assert.Equal(t, key, string(resp.Response.Key))
assert.Equal(t, value, string(resp.Response.Value))
assert.Equal(t, txResp.Tx, tx)
assert.Equal(t, txResp.Height, resp.Height)
// NOTE: we don't test abci query of the light client
if node.Mode == e2e.ModeLight {
return
}
abciResp, err := client.ABCIQuery(ctx, "", []byte(key))
require.NoError(t, err)
assert.Equal(t, key, string(abciResp.Response.Key))
assert.Equal(t, value, string(abciResp.Response.Value))
})
}

View File

@@ -2,6 +2,7 @@ package pex
import (
"net"
"os"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/ed25519"
@@ -13,32 +14,38 @@ import (
)
var (
pexR *pex.Reactor
peer p2p.Peer
pexR *pex.Reactor
peer p2p.Peer
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
)
func init() {
addrB := pex.NewAddrBook("./testdata/addrbook1", false)
pexR := pex.NewReactor(addrB, &pex.ReactorConfig{SeedMode: false})
if pexR == nil {
panic("NewReactor returned nil")
}
pexR.SetLogger(log.NewNopLogger())
peer := newFuzzPeer()
pexR = pex.NewReactor(addrB, &pex.ReactorConfig{SeedMode: false})
pexR.SetLogger(logger)
peer = newFuzzPeer()
pexR.AddPeer(peer)
}
func Fuzz(data []byte) int {
// MakeSwitch uses log.TestingLogger which can't be executed in init()
cfg := config.DefaultP2PConfig()
cfg.PexReactor = true
sw := p2p.MakeSwitch(cfg, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
return sw
})
}, logger)
pexR.SetSwitch(sw)
}
func Fuzz(data []byte) int {
if len(data) == 0 {
return -1
}
pexR.Receive(pex.PexChannel, peer, data)
if !peer.IsRunning() {
// do not increase priority for msgs which lead to peer being stopped
return 0
}
return 1
}
@@ -64,15 +71,15 @@ var defaultNodeInfo = p2p.NodeInfo{
0,
),
NodeID: nodeID,
ListenAddr: "0.0.0.0:98992",
ListenAddr: "127.0.0.1:0",
Moniker: "foo1",
}
func (fp *fuzzPeer) FlushStop() {}
func (fp *fuzzPeer) ID() p2p.NodeID { return nodeID }
func (fp *fuzzPeer) RemoteIP() net.IP { return net.IPv4(0, 0, 0, 0) }
func (fp *fuzzPeer) RemoteIP() net.IP { return net.IPv4(198, 163, 190, 214) }
func (fp *fuzzPeer) RemoteAddr() net.Addr {
return &net.TCPAddr{IP: fp.RemoteIP(), Port: 98991, Zone: ""}
return &net.TCPAddr{IP: fp.RemoteIP(), Port: 26656, Zone: ""}
}
func (fp *fuzzPeer) IsOutbound() bool { return false }
func (fp *fuzzPeer) IsPersistent() bool { return false }

View File

@@ -17,18 +17,40 @@ func Fuzz(data []byte) int {
}
fooConn, barConn := makeSecretConnPair()
n, err := fooConn.Write(data)
if err != nil {
panic(err)
// Run Write in a separate goroutine because if data is greater than 1024
// bytes, each Write must be followed by Read (see io.Pipe documentation).
go func() {
// Copy data because Write modifies the slice.
dataToWrite := make([]byte, len(data))
copy(dataToWrite, data)
n, err := fooConn.Write(dataToWrite)
if err != nil {
panic(err)
}
if n < len(data) {
panic(fmt.Sprintf("wanted to write %d bytes, but %d was written", len(data), n))
}
}()
dataRead := make([]byte, len(data))
totalRead := 0
for totalRead < len(data) {
buf := make([]byte, len(data)-totalRead)
m, err := barConn.Read(buf)
if err != nil {
panic(err)
}
copy(dataRead[totalRead:], buf[:m])
totalRead += m
log.Printf("total read: %d", totalRead)
}
dataRead := make([]byte, n)
m, err := barConn.Read(dataRead)
if err != nil {
panic(err)
}
if !bytes.Equal(data[:n], dataRead[:m]) {
panic(fmt.Sprintf("bytes written %X != read %X", data[:n], dataRead[:m]))
if !bytes.Equal(data, dataRead) {
panic("bytes written != read")
}
return 1
}

View File

@@ -6,6 +6,7 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"github.com/tendermint/tendermint/libs/log"
rs "github.com/tendermint/tendermint/rpc/jsonrpc/server"
@@ -18,13 +19,16 @@ var rpcFuncMap = map[string]*rs.RPCFunc{
var mux *http.ServeMux
func init() {
mux := http.NewServeMux()
buf := new(bytes.Buffer)
lgr := log.NewTMLogger(buf)
mux = http.NewServeMux()
lgr := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
rs.RegisterRPCFuncs(mux, rpcFuncMap, lgr)
}
func Fuzz(data []byte) int {
if len(data) == 0 {
return -1
}
req, _ := http.NewRequest("POST", "http://localhost/", bytes.NewReader(data))
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)
@@ -36,9 +40,11 @@ func Fuzz(data []byte) int {
if err := res.Body.Close(); err != nil {
panic(err)
}
recv := new(types.RPCResponse)
if err := json.Unmarshal(blob, recv); err != nil {
panic(err)
if len(blob) > 0 {
recv := new(types.RPCResponse)
if err := json.Unmarshal(blob, recv); err != nil {
panic(err)
}
}
return 1
}

View File

@@ -94,7 +94,10 @@ func NewTestHarness(logger log.Logger, cfg TestHarnessConfig) (*TestHarness, err
logger.Info("Loading private validator configuration", "keyFile", keyFile, "stateFile", stateFile)
// NOTE: LoadFilePV ultimately calls os.Exit on failure. No error will be
// returned if this call fails.
fpv := privval.LoadFilePV(keyFile, stateFile)
fpv, err := privval.LoadFilePV(keyFile, stateFile)
if err != nil {
return nil, err
}
genesisFile := ExpandPath(cfg.GenesisFile)
logger.Info("Loading chain ID from genesis file", "genesisFile", genesisFile)

View File

@@ -134,7 +134,11 @@ func runTestHarness(acceptRetries int, bindAddr, tmhome string) {
func extractKey(tmhome, outputPath string) {
keyFile := filepath.Join(internal.ExpandPath(tmhome), "config", "priv_validator_key.json")
stateFile := filepath.Join(internal.ExpandPath(tmhome), "data", "priv_validator_state.json")
fpv := privval.LoadFilePV(keyFile, stateFile)
fpv, err := privval.LoadFilePV(keyFile, stateFile)
if err != nil {
logger.Error("Can't load file pv", "err", err)
os.Exit(1)
}
pkb := []byte(fpv.Key.PrivKey.(ed25519.PrivKey))
if err := ioutil.WriteFile(internal.ExpandPath(outputPath), pkb[:32], 0600); err != nil {
logger.Info("Failed to write private key", "output", outputPath, "err", err)

View File

@@ -350,6 +350,7 @@ type Header struct {
ConsensusHash tmbytes.HexBytes `json:"consensus_hash"` // consensus params for current block
AppHash tmbytes.HexBytes `json:"app_hash"` // state after txs from the previous block
// root hash of all results from the txs from the previous block
// see `deterministicResponseDeliverTx` to understand which parts of a tx is hashed into here
LastResultsHash tmbytes.HexBytes `json:"last_results_hash"`
// consensus info