mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-16 17:52:50 +00:00
Compare commits
33 Commits
finalizeBl
...
e2e-small-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a57953ac57 | ||
|
|
cabd916517 | ||
|
|
363ea56680 | ||
|
|
aa4854ff8f | ||
|
|
581dd01d47 | ||
|
|
50b00dff71 | ||
|
|
051e127d38 | ||
|
|
5530726df8 | ||
|
|
decac693ab | ||
|
|
7ca0f24040 | ||
|
|
69848bef26 | ||
|
|
2c14d491f6 | ||
|
|
cd248576ea | ||
|
|
c256edc622 | ||
|
|
9d9360774f | ||
|
|
c7c11fc7d5 | ||
|
|
37bc1d74df | ||
|
|
d882f31569 | ||
|
|
ba3f7106b1 | ||
|
|
3ccfb26137 | ||
|
|
96863decca | ||
|
|
d4cda544ae | ||
|
|
800cce80b7 | ||
|
|
e850863296 | ||
|
|
1dec3e139a | ||
|
|
11b920480f | ||
|
|
4f8bcb1cce | ||
|
|
2d95e38986 | ||
|
|
6bb4b688e0 | ||
|
|
a1e1e6c290 | ||
|
|
736364178a | ||
|
|
a99c7188d7 | ||
|
|
a56b10fbef |
2
.github/CODEOWNERS
vendored
2
.github/CODEOWNERS
vendored
@@ -7,5 +7,5 @@
|
||||
# global owners are only requested if there isn't a more specific
|
||||
# codeowner specified below. For this reason, the global codeowners
|
||||
# are often repeated in package-level definitions.
|
||||
* @alexanderbez @ebuchman @cmwaters @tessr @tychoish
|
||||
* @alexanderbez @ebuchman @cmwaters @tessr @tychoish @williambanfield
|
||||
|
||||
|
||||
2
.github/workflows/linkchecker.yml
vendored
2
.github/workflows/linkchecker.yml
vendored
@@ -7,6 +7,6 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2.3.4
|
||||
- uses: gaurav-nelson/github-action-markdown-link-check@1.0.12
|
||||
- uses: gaurav-nelson/github-action-markdown-link-check@1.0.13
|
||||
with:
|
||||
folder-path: "docs"
|
||||
|
||||
32
.github/workflows/tests.yml
vendored
32
.github/workflows/tests.yml
vendored
@@ -42,38 +42,6 @@ jobs:
|
||||
key: ${{ runner.os }}-${{ github.sha }}-tm-binary
|
||||
if: env.GIT_DIFF
|
||||
|
||||
test_abci_apps:
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
timeout-minutes: 5
|
||||
steps:
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: "1.16"
|
||||
- uses: actions/checkout@v2.3.4
|
||||
- uses: technote-space/get-diff-action@v4
|
||||
with:
|
||||
PATTERNS: |
|
||||
**/**.go
|
||||
go.mod
|
||||
go.sum
|
||||
- uses: actions/cache@v2.1.6
|
||||
with:
|
||||
path: ~/go/pkg/mod
|
||||
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
|
||||
restore-keys: |
|
||||
${{ runner.os }}-go-
|
||||
if: env.GIT_DIFF
|
||||
- uses: actions/cache@v2.1.6
|
||||
with:
|
||||
path: ~/go/bin
|
||||
key: ${{ runner.os }}-${{ github.sha }}-tm-binary
|
||||
if: env.GIT_DIFF
|
||||
- name: test_abci_apps
|
||||
run: abci/tests/test_app/test.sh
|
||||
shell: bash
|
||||
if: env.GIT_DIFF
|
||||
|
||||
test_abci_cli:
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -15,7 +15,7 @@
|
||||
.vagrant
|
||||
.vendor-new/
|
||||
.vscode/
|
||||
abci-cli
|
||||
abci/abci-cli
|
||||
addrbook.json
|
||||
artifacts/*
|
||||
build/*
|
||||
|
||||
@@ -22,6 +22,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- [config] \#6462 Move `PrivValidator` configuration out of `BaseConfig` into its own section. (@tychoish)
|
||||
- [rpc] \#6610 Add MaxPeerBlockHeight into /status rpc call (@JayT106)
|
||||
- [libs/CList] \#6626 Automatically detach the prev/next elements in Remove function (@JayT106)
|
||||
- [fastsync/rpc] \#6620 Add TotalSyncedTime & RemainingTime to SyncInfo in /status RPC (@JayT106)
|
||||
|
||||
- Apps
|
||||
- [ABCI] \#6408 Change the `key` and `value` fields from `[]byte` to `string` in the `EventAttribute` type. (@alexanderbez)
|
||||
@@ -30,6 +31,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- [ABCI] \#5818 Use protoio for msg length delimitation. Migrates from int64 to uint64 length delimiters.
|
||||
- [Version] \#6494 `TMCoreSemVer` has been renamed to `TMVersion`.
|
||||
- It is not required any longer to set ldflags to set version strings
|
||||
- [abci/counter] \#6684 Delete counter example app
|
||||
|
||||
- P2P Protocol
|
||||
|
||||
@@ -147,3 +149,5 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- [rpc] \#6507 fix RPC client doesn't handle url's without ports (@JayT106)
|
||||
- [statesync] \#6463 Adds Reverse Sync feature to fetch historical light blocks after state sync in order to verify any evidence (@cmwaters)
|
||||
- [fastsync] \#6590 Update the metrics during fast-sync (@JayT106)
|
||||
- [gitignore] \#6668 Fix gitignore of abci-cli (@tanyabouman)
|
||||
- [light] \#6687 Fix bug with incorrecly handled contexts in the light client (@cmwaters)
|
||||
|
||||
2
Makefile
2
Makefile
@@ -202,7 +202,7 @@ format:
|
||||
|
||||
lint:
|
||||
@echo "--> Running linter"
|
||||
@golangci-lint run
|
||||
go run github.com/golangci/golangci-lint/cmd/golangci-lint run
|
||||
.PHONY: lint
|
||||
|
||||
DESTINATION = ./index.html.md
|
||||
|
||||
@@ -17,7 +17,6 @@ import (
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/example/code"
|
||||
"github.com/tendermint/tendermint/abci/example/counter"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/abci/server"
|
||||
servertest "github.com/tendermint/tendermint/abci/tests/server"
|
||||
@@ -47,9 +46,6 @@ var (
|
||||
flagHeight int
|
||||
flagProve bool
|
||||
|
||||
// counter
|
||||
flagSerial bool
|
||||
|
||||
// kvstore
|
||||
flagPersist string
|
||||
)
|
||||
@@ -61,9 +57,7 @@ var RootCmd = &cobra.Command{
|
||||
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
|
||||
switch cmd.Use {
|
||||
case "counter", "kvstore": // for the examples apps, don't pre-run
|
||||
return nil
|
||||
case "version": // skip running for version command
|
||||
case "kvstore", "version":
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -135,10 +129,6 @@ func addQueryFlags() {
|
||||
"whether or not to return a merkle proof of the query result")
|
||||
}
|
||||
|
||||
func addCounterFlags() {
|
||||
counterCmd.PersistentFlags().BoolVarP(&flagSerial, "serial", "", false, "enforce incrementing (serial) transactions")
|
||||
}
|
||||
|
||||
func addKVStoreFlags() {
|
||||
kvstoreCmd.PersistentFlags().StringVarP(&flagPersist, "persist", "", "", "directory to use for a database")
|
||||
}
|
||||
@@ -157,8 +147,6 @@ func addCommands() {
|
||||
RootCmd.AddCommand(queryCmd)
|
||||
|
||||
// examples
|
||||
addCounterFlags()
|
||||
RootCmd.AddCommand(counterCmd)
|
||||
addKVStoreFlags()
|
||||
RootCmd.AddCommand(kvstoreCmd)
|
||||
}
|
||||
@@ -258,14 +246,6 @@ var queryCmd = &cobra.Command{
|
||||
RunE: cmdQuery,
|
||||
}
|
||||
|
||||
var counterCmd = &cobra.Command{
|
||||
Use: "counter",
|
||||
Short: "ABCI demo example",
|
||||
Long: "ABCI demo example",
|
||||
Args: cobra.ExactArgs(0),
|
||||
RunE: cmdCounter,
|
||||
}
|
||||
|
||||
var kvstoreCmd = &cobra.Command{
|
||||
Use: "kvstore",
|
||||
Short: "ABCI demo example",
|
||||
@@ -593,32 +573,6 @@ func cmdQuery(cmd *cobra.Command, args []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func cmdCounter(cmd *cobra.Command, args []string) error {
|
||||
app := counter.NewApplication(flagSerial)
|
||||
logger := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false)
|
||||
|
||||
// Start the listener
|
||||
srv, err := server.NewServer(flagAddress, flagAbci, app)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
srv.SetLogger(logger.With("module", "abci-server"))
|
||||
if err := srv.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop upon receiving SIGTERM or CTRL-C.
|
||||
tmos.TrapSignal(logger, func() {
|
||||
// Cleanup
|
||||
if err := srv.Stop(); err != nil {
|
||||
logger.Error("Error while stopping server", "err", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Run forever.
|
||||
select {}
|
||||
}
|
||||
|
||||
func cmdKVStore(cmd *cobra.Command, args []string) error {
|
||||
logger := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false)
|
||||
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
package counter
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/code"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
type Application struct {
|
||||
types.BaseApplication
|
||||
|
||||
hashCount int
|
||||
txCount int
|
||||
serial bool
|
||||
}
|
||||
|
||||
func NewApplication(serial bool) *Application {
|
||||
return &Application{serial: serial}
|
||||
}
|
||||
|
||||
func (app *Application) Info(req types.RequestInfo) types.ResponseInfo {
|
||||
return types.ResponseInfo{Data: fmt.Sprintf("{\"hashes\":%v,\"txs\":%v}", app.hashCount, app.txCount)}
|
||||
}
|
||||
|
||||
func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeliverTx {
|
||||
if app.serial {
|
||||
if len(req.Tx) > 8 {
|
||||
return types.ResponseDeliverTx{
|
||||
Code: code.CodeTypeEncodingError,
|
||||
Log: fmt.Sprintf("Max tx size is 8 bytes, got %d", len(req.Tx))}
|
||||
}
|
||||
tx8 := make([]byte, 8)
|
||||
copy(tx8[len(tx8)-len(req.Tx):], req.Tx)
|
||||
txValue := binary.BigEndian.Uint64(tx8)
|
||||
if txValue != uint64(app.txCount) {
|
||||
return types.ResponseDeliverTx{
|
||||
Code: code.CodeTypeBadNonce,
|
||||
Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.txCount, txValue)}
|
||||
}
|
||||
}
|
||||
app.txCount++
|
||||
return types.ResponseDeliverTx{Code: code.CodeTypeOK}
|
||||
}
|
||||
|
||||
func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
|
||||
if app.serial {
|
||||
if len(req.Tx) > 8 {
|
||||
return types.ResponseCheckTx{
|
||||
Code: code.CodeTypeEncodingError,
|
||||
Log: fmt.Sprintf("Max tx size is 8 bytes, got %d", len(req.Tx))}
|
||||
}
|
||||
|
||||
tx8 := make([]byte, 8)
|
||||
copy(tx8[len(tx8)-len(req.Tx):], req.Tx)
|
||||
txValue := binary.BigEndian.Uint64(tx8)
|
||||
if txValue < uint64(app.txCount) {
|
||||
return types.ResponseCheckTx{
|
||||
Code: code.CodeTypeBadNonce,
|
||||
Log: fmt.Sprintf("Invalid nonce. Expected >= %v, got %v", app.txCount, txValue)}
|
||||
}
|
||||
}
|
||||
return types.ResponseCheckTx{Code: code.CodeTypeOK}
|
||||
}
|
||||
|
||||
func (app *Application) Commit() (resp types.ResponseCommit) {
|
||||
app.hashCount++
|
||||
if app.txCount == 0 {
|
||||
return types.ResponseCommit{}
|
||||
}
|
||||
hash := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(hash, uint64(app.txCount))
|
||||
return types.ResponseCommit{Data: hash}
|
||||
}
|
||||
|
||||
func (app *Application) Query(reqQuery types.RequestQuery) types.ResponseQuery {
|
||||
switch reqQuery.Path {
|
||||
case "hash":
|
||||
return types.ResponseQuery{Value: []byte(fmt.Sprintf("%v", app.hashCount))}
|
||||
case "tx":
|
||||
return types.ResponseQuery{Value: []byte(fmt.Sprintf("%v", app.txCount))}
|
||||
default:
|
||||
return types.ResponseQuery{Log: fmt.Sprintf("Invalid query path. Expected hash or tx, got %v", reqQuery.Path)}
|
||||
}
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
func startClient(abciType string) abcicli.Client {
|
||||
// Start client
|
||||
client, err := abcicli.NewClient("tcp://127.0.0.1:26658", abciType, true)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
logger := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false)
|
||||
client.SetLogger(logger.With("module", "abcicli"))
|
||||
if err := client.Start(); err != nil {
|
||||
panicf("connecting to abci_app: %v", err.Error())
|
||||
}
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
func commit(client abcicli.Client, hashExp []byte) {
|
||||
res, err := client.CommitSync(ctx)
|
||||
if err != nil {
|
||||
panicf("client error: %v", err)
|
||||
}
|
||||
if !bytes.Equal(res.Data, hashExp) {
|
||||
panicf("Commit hash was unexpected. Got %X expected %X", res.Data, hashExp)
|
||||
}
|
||||
}
|
||||
|
||||
func deliverTx(client abcicli.Client, txBytes []byte, codeExp uint32, dataExp []byte) {
|
||||
res, err := client.DeliverTxSync(ctx, types.RequestDeliverTx{Tx: txBytes})
|
||||
if err != nil {
|
||||
panicf("client error: %v", err)
|
||||
}
|
||||
if res.Code != codeExp {
|
||||
panicf("DeliverTx response code was unexpected. Got %v expected %v. Log: %v", res.Code, codeExp, res.Log)
|
||||
}
|
||||
if !bytes.Equal(res.Data, dataExp) {
|
||||
panicf("DeliverTx response data was unexpected. Got %X expected %X", res.Data, dataExp)
|
||||
}
|
||||
}
|
||||
|
||||
func panicf(format string, a ...interface{}) {
|
||||
panic(fmt.Sprintf(format, a...))
|
||||
}
|
||||
@@ -1,93 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/exec"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
var abciType string
|
||||
|
||||
func init() {
|
||||
abciType = os.Getenv("ABCI")
|
||||
if abciType == "" {
|
||||
abciType = "socket"
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
testCounter()
|
||||
}
|
||||
|
||||
const (
|
||||
maxABCIConnectTries = 10
|
||||
)
|
||||
|
||||
func ensureABCIIsUp(typ string, n int) error {
|
||||
var err error
|
||||
cmdString := "abci-cli echo hello"
|
||||
if typ == "grpc" {
|
||||
cmdString = "abci-cli --abci grpc echo hello"
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
cmd := exec.Command("bash", "-c", cmdString)
|
||||
_, err = cmd.CombinedOutput()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func testCounter() {
|
||||
abciApp := os.Getenv("ABCI_APP")
|
||||
if abciApp == "" {
|
||||
panic("No ABCI_APP specified")
|
||||
}
|
||||
|
||||
fmt.Printf("Running %s test with abci=%s\n", abciApp, abciType)
|
||||
subCommand := fmt.Sprintf("abci-cli %s", abciApp)
|
||||
cmd := exec.Command("bash", "-c", subCommand)
|
||||
cmd.Stdout = os.Stdout
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Fatalf("starting %q err: %v", abciApp, err)
|
||||
}
|
||||
defer func() {
|
||||
if err := cmd.Process.Kill(); err != nil {
|
||||
log.Printf("error on process kill: %v", err)
|
||||
}
|
||||
if err := cmd.Wait(); err != nil {
|
||||
log.Printf("error while waiting for cmd to exit: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := ensureABCIIsUp(abciType, maxABCIConnectTries); err != nil {
|
||||
log.Fatalf("echo failed: %v", err) //nolint:gocritic
|
||||
}
|
||||
|
||||
client := startClient(abciType)
|
||||
defer func() {
|
||||
if err := client.Stop(); err != nil {
|
||||
log.Printf("error trying client stop: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// commit(client, nil)
|
||||
// deliverTx(client, []byte("abc"), code.CodeTypeBadNonce, nil)
|
||||
commit(client, nil)
|
||||
deliverTx(client, []byte{0x00}, types.CodeTypeOK, nil)
|
||||
commit(client, []byte{0, 0, 0, 0, 0, 0, 0, 1})
|
||||
// deliverTx(client, []byte{0x00}, code.CodeTypeBadNonce, nil)
|
||||
deliverTx(client, []byte{0x01}, types.CodeTypeOK, nil)
|
||||
deliverTx(client, []byte{0x00, 0x02}, types.CodeTypeOK, nil)
|
||||
deliverTx(client, []byte{0x00, 0x03}, types.CodeTypeOK, nil)
|
||||
deliverTx(client, []byte{0x00, 0x00, 0x04}, types.CodeTypeOK, nil)
|
||||
// deliverTx(client, []byte{0x00, 0x00, 0x06}, code.CodeTypeBadNonce, nil)
|
||||
commit(client, []byte{0, 0, 0, 0, 0, 0, 0, 5})
|
||||
}
|
||||
@@ -1,28 +0,0 @@
|
||||
#! /bin/bash
|
||||
set -e
|
||||
|
||||
# These tests spawn the counter app and server by execing the ABCI_APP command and run some simple client tests against it
|
||||
|
||||
# Get the directory of where this script is.
|
||||
export PATH="$GOBIN:$PATH"
|
||||
SOURCE="${BASH_SOURCE[0]}"
|
||||
while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done
|
||||
DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
|
||||
|
||||
# Change into that dir because we expect that.
|
||||
cd "$DIR"
|
||||
|
||||
echo "RUN COUNTER OVER SOCKET"
|
||||
# test golang counter
|
||||
ABCI_APP="counter" go run -mod=readonly ./*.go
|
||||
echo "----------------------"
|
||||
|
||||
|
||||
echo "RUN COUNTER OVER GRPC"
|
||||
# test golang counter via grpc
|
||||
ABCI_APP="counter --abci=grpc" ABCI="grpc" go run -mod=readonly ./*.go
|
||||
echo "----------------------"
|
||||
|
||||
# test nodejs counter
|
||||
# TODO: fix node app
|
||||
#ABCI_APP="node $GOPATH/src/github.com/tendermint/js-abci/example/app.js" go test -test.run TestCounter
|
||||
@@ -37,7 +37,6 @@ function testExample() {
|
||||
}
|
||||
|
||||
testExample 1 tests/test_cli/ex1.abci abci-cli kvstore
|
||||
testExample 2 tests/test_cli/ex2.abci abci-cli counter
|
||||
|
||||
echo ""
|
||||
echo "PASS"
|
||||
|
||||
@@ -48,9 +48,7 @@ func AddNodeFlags(cmd *cobra.Command) {
|
||||
"proxy-app",
|
||||
config.ProxyApp,
|
||||
"proxy app address, or one of: 'kvstore',"+
|
||||
" 'persistent_kvstore',"+
|
||||
" 'counter',"+
|
||||
" 'counter_serial' or 'noop' for local testing.")
|
||||
" 'persistent_kvstore' or 'noop' for local testing.")
|
||||
cmd.Flags().String("abci", config.ABCI, "specify abci transport (socket | grpc)")
|
||||
|
||||
// rpc flags
|
||||
|
||||
@@ -31,7 +31,6 @@ Available Commands:
|
||||
check_tx Validate a tx
|
||||
commit Commit the application state and return the Merkle root hash
|
||||
console Start an interactive abci console for multiple commands
|
||||
counter ABCI demo example
|
||||
deliver_tx Deliver a new tx to the application
|
||||
kvstore ABCI demo example
|
||||
echo Have the application echo a message
|
||||
@@ -214,137 +213,9 @@ we do `deliver_tx "abc=efg"` it will store `(abc, efg)`.
|
||||
Similarly, you could put the commands in a file and run
|
||||
`abci-cli --verbose batch < myfile`.
|
||||
|
||||
## Counter - Another Example
|
||||
|
||||
Now that we've got the hang of it, let's try another application, the
|
||||
"counter" app.
|
||||
|
||||
Like the kvstore app, its code can be found
|
||||
[here](https://github.com/tendermint/tendermint/blob/master/abci/cmd/abci-cli/abci-cli.go)
|
||||
and looks like:
|
||||
|
||||
```go
|
||||
func cmdCounter(cmd *cobra.Command, args []string) error {
|
||||
|
||||
app := counter.NewCounterApplication(flagSerial)
|
||||
|
||||
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
|
||||
|
||||
// Start the listener
|
||||
srv, err := server.NewServer(flagAddrC, flagAbci, app)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
srv.SetLogger(logger.With("module", "abci-server"))
|
||||
if err := srv.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop upon receiving SIGTERM or CTRL-C.
|
||||
tmos.TrapSignal(logger, func() {
|
||||
// Cleanup
|
||||
srv.Stop()
|
||||
})
|
||||
|
||||
// Run forever.
|
||||
select {}
|
||||
}
|
||||
```
|
||||
|
||||
The counter app doesn't use a Merkle tree, it just counts how many times
|
||||
we've sent a transaction, asked for a hash, or committed the state. The
|
||||
result of `commit` is just the number of transactions sent.
|
||||
|
||||
This application has two modes: `serial=off` and `serial=on`.
|
||||
|
||||
When `serial=on`, transactions must be a big-endian encoded incrementing
|
||||
integer, starting at 0.
|
||||
|
||||
If `serial=off`, there are no restrictions on transactions.
|
||||
|
||||
We can toggle the value of `serial` using the `set_option` ABCI message.
|
||||
|
||||
When `serial=on`, some transactions are invalid. In a live blockchain,
|
||||
transactions collect in memory before they are committed into blocks. To
|
||||
avoid wasting resources on invalid transactions, ABCI provides the
|
||||
`check_tx` message, which application developers can use to accept or
|
||||
reject transactions, before they are stored in memory or gossipped to
|
||||
other peers.
|
||||
|
||||
In this instance of the counter app, `check_tx` only allows transactions
|
||||
whose integer is greater than the last committed one.
|
||||
|
||||
Let's kill the console and the kvstore application, and start the
|
||||
counter app:
|
||||
|
||||
```sh
|
||||
abci-cli counter
|
||||
```
|
||||
|
||||
In another window, start the `abci-cli console`:
|
||||
|
||||
```sh
|
||||
|
||||
> check_tx 0x00
|
||||
-> code: OK
|
||||
|
||||
> check_tx 0xff
|
||||
-> code: OK
|
||||
|
||||
> deliver_tx 0x00
|
||||
-> code: OK
|
||||
|
||||
> check_tx 0x00
|
||||
-> code: BadNonce
|
||||
-> log: Invalid nonce. Expected >= 1, got 0
|
||||
|
||||
> deliver_tx 0x01
|
||||
-> code: OK
|
||||
|
||||
> deliver_tx 0x04
|
||||
-> code: BadNonce
|
||||
-> log: Invalid nonce. Expected 2, got 4
|
||||
|
||||
> info
|
||||
-> code: OK
|
||||
-> data: {"hashes":0,"txs":2}
|
||||
-> data.hex: 0x7B22686173686573223A302C22747873223A327D
|
||||
```
|
||||
|
||||
This is a very simple application, but between `counter` and `kvstore`,
|
||||
its easy to see how you can build out arbitrary application states on
|
||||
top of the ABCI. [Hyperledger's
|
||||
Burrow](https://github.com/hyperledger/burrow) also runs atop ABCI,
|
||||
bringing with it Ethereum-like accounts, the Ethereum virtual-machine,
|
||||
Monax's permissioning scheme, and native contracts extensions.
|
||||
|
||||
But the ultimate flexibility comes from being able to write the
|
||||
application easily in any language.
|
||||
|
||||
We have implemented the counter in a number of languages [see the
|
||||
example directory](https://github.com/tendermint/tendermint/tree/master/abci/example).
|
||||
|
||||
To run the Node.js version, fist download & install [the Javascript ABCI server](https://github.com/tendermint/js-abci):
|
||||
|
||||
```sh
|
||||
git clone https://github.com/tendermint/js-abci.git
|
||||
cd js-abci
|
||||
npm install abci
|
||||
```
|
||||
|
||||
Now you can start the app:
|
||||
|
||||
```sh
|
||||
node example/counter.js
|
||||
```
|
||||
|
||||
(you'll have to kill the other counter application process). In another
|
||||
window, run the console and those previous ABCI commands. You should get
|
||||
the same results as for the Go version.
|
||||
|
||||
## Bounties
|
||||
|
||||
Want to write the counter app in your favorite language?! We'd be happy
|
||||
Want to write an app in your favorite language?! We'd be happy
|
||||
to add you to our [ecosystem](https://github.com/tendermint/awesome#ecosystem)!
|
||||
See [funding](https://github.com/interchainio/funding) opportunities from the
|
||||
[Interchain Foundation](https://interchain.io/) for implementations in new languages and more.
|
||||
|
||||
@@ -37,8 +37,8 @@ cd $GOPATH/src/github.com/tendermint/tendermint
|
||||
make install_abci
|
||||
```
|
||||
|
||||
Now you should have the `abci-cli` installed; you'll see a couple of
|
||||
commands (`counter` and `kvstore`) that are example applications written
|
||||
Now you should have the `abci-cli` installed; you'll notice the `kvstore`
|
||||
command, an example application written
|
||||
in Go. See below for an application written in JavaScript.
|
||||
|
||||
Now, let's run some apps!
|
||||
@@ -165,92 +165,6 @@ curl -s 'localhost:26657/abci_query?data="name"'
|
||||
Try some other transactions and queries to make sure everything is
|
||||
working!
|
||||
|
||||
## Counter - Another Example
|
||||
|
||||
Now that we've got the hang of it, let's try another application, the
|
||||
`counter` app.
|
||||
|
||||
The counter app doesn't use a Merkle tree, it just counts how many times
|
||||
we've sent a transaction, or committed the state.
|
||||
|
||||
This application has two modes: `serial=off` and `serial=on`.
|
||||
|
||||
When `serial=on`, transactions must be a big-endian encoded incrementing
|
||||
integer, starting at 0.
|
||||
|
||||
If `serial=off`, there are no restrictions on transactions.
|
||||
|
||||
In a live blockchain, transactions collect in memory before they are
|
||||
committed into blocks. To avoid wasting resources on invalid
|
||||
transactions, ABCI provides the `CheckTx` message, which application
|
||||
developers can use to accept or reject transactions, before they are
|
||||
stored in memory or gossipped to other peers.
|
||||
|
||||
In this instance of the counter app, with `serial=on`, `CheckTx` only
|
||||
allows transactions whose integer is greater than the last committed
|
||||
one.
|
||||
|
||||
Let's kill the previous instance of `tendermint` and the `kvstore`
|
||||
application, and start the counter app. We can enable `serial=on` with a
|
||||
flag:
|
||||
|
||||
```sh
|
||||
abci-cli counter --serial
|
||||
```
|
||||
|
||||
In another window, reset then start Tendermint:
|
||||
|
||||
```sh
|
||||
tendermint unsafe_reset_all
|
||||
tendermint start
|
||||
```
|
||||
|
||||
Once again, you can see the blocks streaming by. Let's send some
|
||||
transactions. Since we have set `serial=on`, the first transaction must
|
||||
be the number `0`:
|
||||
|
||||
```sh
|
||||
curl localhost:26657/broadcast_tx_commit?tx=0x00
|
||||
```
|
||||
|
||||
Note the empty (hence successful) response. The next transaction must be
|
||||
the number `1`. If instead, we try to send a `5`, we get an error:
|
||||
|
||||
```json
|
||||
> curl localhost:26657/broadcast_tx_commit?tx=0x05
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"id": "",
|
||||
"result": {
|
||||
"check_tx": {},
|
||||
"deliver_tx": {
|
||||
"code": 2,
|
||||
"log": "Invalid nonce. Expected 1, got 5"
|
||||
},
|
||||
"hash": "33B93DFF98749B0D6996A70F64071347060DC19C",
|
||||
"height": 34
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
But if we send a `1`, it works again:
|
||||
|
||||
```json
|
||||
> curl localhost:26657/broadcast_tx_commit?tx=0x01
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"id": "",
|
||||
"result": {
|
||||
"check_tx": {},
|
||||
"deliver_tx": {},
|
||||
"hash": "F17854A977F6FA7EEA1BD758E296710B86F72F3D",
|
||||
"height": 60
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
For more details on the `broadcast_tx` API, see [the guide on using
|
||||
Tendermint](../tendermint-core/using-tendermint.md).
|
||||
|
||||
## CounterJS - Example in Another Language
|
||||
|
||||
|
||||
@@ -31,24 +31,61 @@ For example:
|
||||
|
||||
would be equal to the composite key of `jack.account.number`.
|
||||
|
||||
Let's take a look at the `[tx_index]` config section:
|
||||
By default, Tendermint will index all transactions by their respective hashes
|
||||
and height and blocks by their height.
|
||||
|
||||
## Configuration
|
||||
|
||||
Operators can configure indexing via the `[tx_index]` section. The `indexer`
|
||||
field takes a series of supported indexers. If `null` is included, indexing will
|
||||
be turned off regardless of other values provided.
|
||||
|
||||
```toml
|
||||
##### transactions indexer configuration options #####
|
||||
[tx_index]
|
||||
[tx-index]
|
||||
|
||||
# What indexer to use for transactions
|
||||
# The backend database list to back the indexer.
|
||||
# If list contains null, meaning no indexer service will be used.
|
||||
#
|
||||
# The application will set which txs to index. In some cases a node operator will be able
|
||||
# to decide which txs to index based on configuration set in the application.
|
||||
#
|
||||
# Options:
|
||||
# 1) "null"
|
||||
# 2) "kv" (default) - the simplest possible indexer, backed by key-value storage (defaults to levelDB; see DBBackend).
|
||||
indexer = "kv"
|
||||
# - When "kv" is chosen "tx.height" and "tx.hash" will always be indexed.
|
||||
# 3) "psql" - the indexer services backed by PostgreSQL.
|
||||
# indexer = []
|
||||
```
|
||||
|
||||
By default, Tendermint will index all transactions by their respective hashes
|
||||
and height and blocks by their height.
|
||||
### Supported Indexers
|
||||
|
||||
You can turn off indexing completely by setting `tx_index` to `null`.
|
||||
#### KV
|
||||
|
||||
The `kv` indexer type is an embedded key-value store supported by the main
|
||||
underling Tendermint database. Using the `kv` indexer type allows you to query
|
||||
for block and transaction events directly against Tendermint's RPC. However, the
|
||||
query syntax is limited and so this indexer type might be deprecated or removed
|
||||
entirely in the future.
|
||||
|
||||
#### PostgreSQL
|
||||
|
||||
The `psql` indexer type allows an operator to enable block and transaction event
|
||||
indexing by proxying it to an external PostgreSQL instance allowing for the events
|
||||
to be stored in relational models. Since the events are stored in a RDBMS, operators
|
||||
can leverage SQL to perform a series of rich and complex queries that are not
|
||||
supported by the `kv` indexer type. Since operators can leverage SQL directly,
|
||||
searching is not enabled for the `psql` indexer type via Tendermint's RPC -- any
|
||||
such query will fail.
|
||||
|
||||
Note, the SQL schema is stored in `state/indexer/sink/psql/schema.sql` and operators
|
||||
must explicitly create the relations prior to starting Tendermint and enabling
|
||||
the `psql` indexer type.
|
||||
|
||||
Example:
|
||||
|
||||
```shell
|
||||
$ psql ... -f state/indexer/sink/psql/schema.sql
|
||||
```
|
||||
|
||||
## Default Indexes
|
||||
|
||||
|
||||
@@ -268,6 +268,8 @@ While we do not favor any operation system, more secure and stable Linux server
|
||||
distributions (like Centos) should be preferred over desktop operation systems
|
||||
(like Mac OS).
|
||||
|
||||
Native Windows support is not provided. If you are using a windows machine, you can try using the [bash shell](https://docs.microsoft.com/en-us/windows/wsl/install-win10).
|
||||
|
||||
### Miscellaneous
|
||||
|
||||
NOTE: if you are going to use Tendermint in a public domain, make sure
|
||||
|
||||
9
go.mod
9
go.mod
@@ -10,9 +10,10 @@ require (
|
||||
github.com/btcsuite/btcd v0.22.0-beta
|
||||
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
|
||||
github.com/fortytw2/leaktest v1.3.0
|
||||
github.com/go-kit/kit v0.10.0
|
||||
github.com/go-kit/kit v0.11.0
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/golangci/golangci-lint v1.41.1
|
||||
github.com/google/orderedcode v0.0.1
|
||||
github.com/google/uuid v1.2.0
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
@@ -21,21 +22,19 @@ require (
|
||||
github.com/lib/pq v1.10.2
|
||||
github.com/libp2p/go-buffer-pool v0.0.2
|
||||
github.com/minio/highwayhash v1.0.2
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
|
||||
github.com/oasisprotocol/curve25519-voi v0.0.0-20210609091139-0a56a4bca00b
|
||||
github.com/ory/dockertest v3.3.5+incompatible
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/prometheus/client_golang v1.11.0
|
||||
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
|
||||
github.com/rs/cors v1.8.0
|
||||
github.com/rs/zerolog v1.23.0
|
||||
github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa
|
||||
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
|
||||
github.com/spf13/cobra v1.2.0
|
||||
github.com/spf13/cobra v1.2.1
|
||||
github.com/spf13/viper v1.8.1
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/tendermint/tm-db v0.6.4
|
||||
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
|
||||
google.golang.org/grpc v1.39.0
|
||||
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
|
||||
|
||||
@@ -83,6 +83,10 @@ type BlockPool struct {
|
||||
|
||||
requestsCh chan<- BlockRequest
|
||||
errorsCh chan<- peerError
|
||||
|
||||
startHeight int64
|
||||
lastHundredBlockTimeStamp time.Time
|
||||
lastSyncRate float64
|
||||
}
|
||||
|
||||
// NewBlockPool returns a new BlockPool with the height equal to start. Block
|
||||
@@ -91,12 +95,14 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
|
||||
bp := &BlockPool{
|
||||
peers: make(map[types.NodeID]*bpPeer),
|
||||
|
||||
requesters: make(map[int64]*bpRequester),
|
||||
height: start,
|
||||
numPending: 0,
|
||||
requesters: make(map[int64]*bpRequester),
|
||||
height: start,
|
||||
startHeight: start,
|
||||
numPending: 0,
|
||||
|
||||
requestsCh: requestsCh,
|
||||
errorsCh: errorsCh,
|
||||
requestsCh: requestsCh,
|
||||
errorsCh: errorsCh,
|
||||
lastSyncRate: 0,
|
||||
}
|
||||
bp.BaseService = *service.NewBaseService(nil, "BlockPool", bp)
|
||||
return bp
|
||||
@@ -106,6 +112,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
|
||||
// pool's start time.
|
||||
func (pool *BlockPool) OnStart() error {
|
||||
pool.lastAdvance = time.Now()
|
||||
pool.lastHundredBlockTimeStamp = pool.lastAdvance
|
||||
go pool.makeRequestersRoutine()
|
||||
return nil
|
||||
}
|
||||
@@ -216,6 +223,19 @@ func (pool *BlockPool) PopRequest() {
|
||||
delete(pool.requesters, pool.height)
|
||||
pool.height++
|
||||
pool.lastAdvance = time.Now()
|
||||
|
||||
// the lastSyncRate will be updated every 100 blocks, it uses the adaptive filter
|
||||
// to smooth the block sync rate and the unit represents the number of blocks per second.
|
||||
if (pool.height-pool.startHeight)%100 == 0 {
|
||||
newSyncRate := 100 / time.Since(pool.lastHundredBlockTimeStamp).Seconds()
|
||||
if pool.lastSyncRate == 0 {
|
||||
pool.lastSyncRate = newSyncRate
|
||||
} else {
|
||||
pool.lastSyncRate = 0.9*pool.lastSyncRate + 0.1*newSyncRate
|
||||
}
|
||||
pool.lastHundredBlockTimeStamp = time.Now()
|
||||
}
|
||||
|
||||
} else {
|
||||
panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height))
|
||||
}
|
||||
@@ -428,6 +448,20 @@ func (pool *BlockPool) debug() string {
|
||||
return str
|
||||
}
|
||||
|
||||
func (pool *BlockPool) targetSyncBlocks() int64 {
|
||||
pool.mtx.RLock()
|
||||
defer pool.mtx.RUnlock()
|
||||
|
||||
return pool.maxPeerHeight - pool.startHeight + 1
|
||||
}
|
||||
|
||||
func (pool *BlockPool) getLastSyncRate() float64 {
|
||||
pool.mtx.RLock()
|
||||
defer pool.mtx.RUnlock()
|
||||
|
||||
return pool.lastSyncRate
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
type bpPeer struct {
|
||||
|
||||
@@ -2,6 +2,7 @@ package v0
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
tmSync "github.com/tendermint/tendermint/libs/sync"
|
||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
@@ -72,7 +74,7 @@ func (e peerError) Error() string {
|
||||
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
|
||||
}
|
||||
|
||||
// BlockchainReactor handles long-term catchup syncing.
|
||||
// Reactor handles long-term catchup syncing.
|
||||
type Reactor struct {
|
||||
service.BaseService
|
||||
|
||||
@@ -83,12 +85,19 @@ type Reactor struct {
|
||||
store *store.BlockStore
|
||||
pool *BlockPool
|
||||
consReactor consensusReactor
|
||||
fastSync bool
|
||||
fastSync *tmSync.AtomicBool
|
||||
|
||||
blockchainCh *p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
peerUpdatesCh chan p2p.Envelope
|
||||
closeCh chan struct{}
|
||||
blockchainCh *p2p.Channel
|
||||
// blockchainOutBridgeCh defines a channel that acts as a bridge between sending Envelope
|
||||
// messages that the reactor will consume in processBlockchainCh and receiving messages
|
||||
// from the peer updates channel and other goroutines. We do this instead of directly
|
||||
// sending on blockchainCh.Out to avoid race conditions in the case where other goroutines
|
||||
// send Envelopes directly to the to blockchainCh.Out channel, since processBlockchainCh
|
||||
// may close the blockchainCh.Out channel at the same time that other goroutines send to
|
||||
// blockchainCh.Out.
|
||||
blockchainOutBridgeCh chan p2p.Envelope
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
closeCh chan struct{}
|
||||
|
||||
requestsCh <-chan BlockRequest
|
||||
errorsCh <-chan peerError
|
||||
@@ -99,6 +108,8 @@ type Reactor struct {
|
||||
poolWG sync.WaitGroup
|
||||
|
||||
metrics *cons.Metrics
|
||||
|
||||
syncStartTime time.Time
|
||||
}
|
||||
|
||||
// NewReactor returns new reactor instance.
|
||||
@@ -126,19 +137,20 @@ func NewReactor(
|
||||
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
|
||||
|
||||
r := &Reactor{
|
||||
initialState: state,
|
||||
blockExec: blockExec,
|
||||
store: store,
|
||||
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
|
||||
consReactor: consReactor,
|
||||
fastSync: fastSync,
|
||||
requestsCh: requestsCh,
|
||||
errorsCh: errorsCh,
|
||||
blockchainCh: blockchainCh,
|
||||
peerUpdates: peerUpdates,
|
||||
peerUpdatesCh: make(chan p2p.Envelope),
|
||||
closeCh: make(chan struct{}),
|
||||
metrics: metrics,
|
||||
initialState: state,
|
||||
blockExec: blockExec,
|
||||
store: store,
|
||||
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
|
||||
consReactor: consReactor,
|
||||
fastSync: tmSync.NewBool(fastSync),
|
||||
requestsCh: requestsCh,
|
||||
errorsCh: errorsCh,
|
||||
blockchainCh: blockchainCh,
|
||||
blockchainOutBridgeCh: make(chan p2p.Envelope),
|
||||
peerUpdates: peerUpdates,
|
||||
closeCh: make(chan struct{}),
|
||||
metrics: metrics,
|
||||
syncStartTime: time.Time{},
|
||||
}
|
||||
|
||||
r.BaseService = *service.NewBaseService(logger, "Blockchain", r)
|
||||
@@ -153,7 +165,7 @@ func NewReactor(
|
||||
// If fastSync is enabled, we also start the pool and the pool processing
|
||||
// goroutine. If the pool fails to start, an error is returned.
|
||||
func (r *Reactor) OnStart() error {
|
||||
if r.fastSync {
|
||||
if r.fastSync.IsSet() {
|
||||
if err := r.pool.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -171,7 +183,7 @@ func (r *Reactor) OnStart() error {
|
||||
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
|
||||
// blocking until they all exit.
|
||||
func (r *Reactor) OnStop() {
|
||||
if r.fastSync {
|
||||
if r.fastSync.IsSet() {
|
||||
if err := r.pool.Stop(); err != nil {
|
||||
r.Logger.Error("failed to stop pool", "err", err)
|
||||
}
|
||||
@@ -265,7 +277,11 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
r.Logger.Error("recovering from processing message panic", "err", err)
|
||||
r.Logger.Error(
|
||||
"recovering from processing message panic",
|
||||
"err", err,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -283,7 +299,7 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
|
||||
}
|
||||
|
||||
// processBlockchainCh initiates a blocking process where we listen for and handle
|
||||
// envelopes on the BlockchainChannel and peerUpdatesCh. Any error encountered during
|
||||
// envelopes on the BlockchainChannel and blockchainOutBridgeCh. Any error encountered during
|
||||
// message execution will result in a PeerError being sent on the BlockchainChannel.
|
||||
// When the reactor is stopped, we will catch the signal and close the p2p Channel
|
||||
// gracefully.
|
||||
@@ -301,8 +317,8 @@ func (r *Reactor) processBlockchainCh() {
|
||||
}
|
||||
}
|
||||
|
||||
case envelop := <-r.peerUpdatesCh:
|
||||
r.blockchainCh.Out <- envelop
|
||||
case envelope := <-r.blockchainOutBridgeCh:
|
||||
r.blockchainCh.Out <- envelope
|
||||
|
||||
case <-r.closeCh:
|
||||
r.Logger.Debug("stopped listening on blockchain channel; closing...")
|
||||
@@ -324,7 +340,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
|
||||
switch peerUpdate.Status {
|
||||
case p2p.PeerStatusUp:
|
||||
// send a status update the newly added peer
|
||||
r.peerUpdatesCh <- p2p.Envelope{
|
||||
r.blockchainOutBridgeCh <- p2p.Envelope{
|
||||
To: peerUpdate.NodeID,
|
||||
Message: &bcproto.StatusResponse{
|
||||
Base: r.store.Base(),
|
||||
@@ -358,7 +374,7 @@ func (r *Reactor) processPeerUpdates() {
|
||||
// SwitchToFastSync is called by the state sync reactor when switching to fast
|
||||
// sync.
|
||||
func (r *Reactor) SwitchToFastSync(state sm.State) error {
|
||||
r.fastSync = true
|
||||
r.fastSync.Set()
|
||||
r.initialState = state
|
||||
r.pool.height = state.LastBlockHeight + 1
|
||||
|
||||
@@ -366,6 +382,8 @@ func (r *Reactor) SwitchToFastSync(state sm.State) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r.syncStartTime = time.Now()
|
||||
|
||||
r.poolWG.Add(1)
|
||||
go r.poolRoutine(true)
|
||||
|
||||
@@ -388,7 +406,7 @@ func (r *Reactor) requestRoutine() {
|
||||
return
|
||||
|
||||
case request := <-r.requestsCh:
|
||||
r.blockchainCh.Out <- p2p.Envelope{
|
||||
r.blockchainOutBridgeCh <- p2p.Envelope{
|
||||
To: request.PeerID,
|
||||
Message: &bcproto.BlockRequest{Height: request.Height},
|
||||
}
|
||||
@@ -405,7 +423,7 @@ func (r *Reactor) requestRoutine() {
|
||||
go func() {
|
||||
defer r.poolWG.Done()
|
||||
|
||||
r.blockchainCh.Out <- p2p.Envelope{
|
||||
r.blockchainOutBridgeCh <- p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: &bcproto.StatusRequest{},
|
||||
}
|
||||
@@ -478,6 +496,8 @@ FOR_LOOP:
|
||||
r.Logger.Error("failed to stop pool", "err", err)
|
||||
}
|
||||
|
||||
r.fastSync.UnSet()
|
||||
|
||||
if r.consReactor != nil {
|
||||
r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
|
||||
}
|
||||
@@ -592,3 +612,27 @@ FOR_LOOP:
|
||||
func (r *Reactor) GetMaxPeerBlockHeight() int64 {
|
||||
return r.pool.MaxPeerHeight()
|
||||
}
|
||||
|
||||
func (r *Reactor) GetTotalSyncedTime() time.Duration {
|
||||
if !r.fastSync.IsSet() || r.syncStartTime.IsZero() {
|
||||
return time.Duration(0)
|
||||
}
|
||||
return time.Since(r.syncStartTime)
|
||||
}
|
||||
|
||||
func (r *Reactor) GetRemainingSyncTime() time.Duration {
|
||||
if !r.fastSync.IsSet() {
|
||||
return time.Duration(0)
|
||||
}
|
||||
|
||||
targetSyncs := r.pool.targetSyncBlocks()
|
||||
currentSyncs := r.store.Height() - r.pool.startHeight + 1
|
||||
lastSyncRate := r.pool.getLastSyncRate()
|
||||
if currentSyncs < 0 || lastSyncRate < 0.001 {
|
||||
return time.Duration(0)
|
||||
}
|
||||
|
||||
remain := float64(targetSyncs-currentSyncs) / lastSyncRate
|
||||
|
||||
return time.Duration(int64(remain * float64(time.Second)))
|
||||
}
|
||||
|
||||
@@ -215,6 +215,29 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
|
||||
rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(rts.nodes[0])
|
||||
}
|
||||
|
||||
func TestReactor_SyncTime(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
|
||||
genDoc, privVals := factory.RandGenesisDoc(config, 1, false, 30)
|
||||
maxBlockHeight := int64(101)
|
||||
|
||||
rts := setup(t, genDoc, privVals[0], []int64{maxBlockHeight, 0}, 0)
|
||||
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
|
||||
rts.start(t)
|
||||
|
||||
require.Eventually(
|
||||
t,
|
||||
func() bool {
|
||||
return rts.reactors[rts.nodes[1]].GetRemainingSyncTime() > time.Nanosecond &&
|
||||
rts.reactors[rts.nodes[1]].pool.getLastSyncRate() > 0.001
|
||||
},
|
||||
10*time.Second,
|
||||
10*time.Millisecond,
|
||||
"expected node to be partially synced",
|
||||
)
|
||||
}
|
||||
|
||||
func TestReactor_NoBlockResponse(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("blockchain_reactor_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/sync"
|
||||
bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain"
|
||||
"github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -34,8 +35,8 @@ type blockStore interface {
|
||||
type BlockchainReactor struct {
|
||||
p2p.BaseReactor
|
||||
|
||||
fastSync bool // if true, enable fast sync on start
|
||||
stateSynced bool // set to true when SwitchToFastSync is called by state sync
|
||||
fastSync *sync.AtomicBool // enable fast sync on start when it's been Set
|
||||
stateSynced bool // set to true when SwitchToFastSync is called by state sync
|
||||
scheduler *Routine
|
||||
processor *Routine
|
||||
logger log.Logger
|
||||
@@ -48,6 +49,10 @@ type BlockchainReactor struct {
|
||||
reporter behavior.Reporter
|
||||
io iIO
|
||||
store blockStore
|
||||
|
||||
syncStartTime time.Time
|
||||
syncStartHeight int64
|
||||
lastSyncRate float64 // # blocks sync per sec base on the last 100 blocks
|
||||
}
|
||||
|
||||
type blockApplier interface {
|
||||
@@ -68,12 +73,15 @@ func newReactor(state state.State, store blockStore, reporter behavior.Reporter,
|
||||
processor := newPcState(pContext)
|
||||
|
||||
return &BlockchainReactor{
|
||||
scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize),
|
||||
processor: newRoutine("processor", processor.handle, chBufferSize),
|
||||
store: store,
|
||||
reporter: reporter,
|
||||
logger: log.NewNopLogger(),
|
||||
fastSync: fastSync,
|
||||
scheduler: newRoutine("scheduler", scheduler.handle, chBufferSize),
|
||||
processor: newRoutine("processor", processor.handle, chBufferSize),
|
||||
store: store,
|
||||
reporter: reporter,
|
||||
logger: log.NewNopLogger(),
|
||||
fastSync: sync.NewBool(fastSync),
|
||||
syncStartHeight: initHeight,
|
||||
syncStartTime: time.Time{},
|
||||
lastSyncRate: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +137,7 @@ func (r *BlockchainReactor) SetLogger(logger log.Logger) {
|
||||
// Start implements cmn.Service interface
|
||||
func (r *BlockchainReactor) Start() error {
|
||||
r.reporter = behavior.NewSwitchReporter(r.BaseReactor.Switch)
|
||||
if r.fastSync {
|
||||
if r.fastSync.IsSet() {
|
||||
err := r.startSync(nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start fast sync: %w", err)
|
||||
@@ -175,7 +183,13 @@ func (r *BlockchainReactor) endSync() {
|
||||
func (r *BlockchainReactor) SwitchToFastSync(state state.State) error {
|
||||
r.stateSynced = true
|
||||
state = state.Copy()
|
||||
return r.startSync(&state)
|
||||
|
||||
err := r.startSync(&state)
|
||||
if err == nil {
|
||||
r.syncStartTime = time.Now()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// reactor generated ticker events:
|
||||
@@ -283,7 +297,6 @@ func (e bcResetState) String() string {
|
||||
|
||||
// Takes the channel as a parameter to avoid race conditions on r.events.
|
||||
func (r *BlockchainReactor) demux(events <-chan Event) {
|
||||
var lastRate = 0.0
|
||||
var lastHundred = time.Now()
|
||||
|
||||
var (
|
||||
@@ -414,10 +427,15 @@ func (r *BlockchainReactor) demux(events <-chan Event) {
|
||||
switch event := event.(type) {
|
||||
case pcBlockProcessed:
|
||||
r.setSyncHeight(event.height)
|
||||
if r.syncHeight%100 == 0 {
|
||||
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
|
||||
if (r.syncHeight-r.syncStartHeight)%100 == 0 {
|
||||
newSyncRate := 100 / time.Since(lastHundred).Seconds()
|
||||
if r.lastSyncRate == 0 {
|
||||
r.lastSyncRate = newSyncRate
|
||||
} else {
|
||||
r.lastSyncRate = 0.9*r.lastSyncRate + 0.1*newSyncRate
|
||||
}
|
||||
r.logger.Info("Fast Sync Rate", "height", r.syncHeight,
|
||||
"max_peer_height", r.maxPeerHeight, "blocks/s", lastRate)
|
||||
"max_peer_height", r.maxPeerHeight, "blocks/s", r.lastSyncRate)
|
||||
lastHundred = time.Now()
|
||||
}
|
||||
r.scheduler.send(event)
|
||||
@@ -429,6 +447,7 @@ func (r *BlockchainReactor) demux(events <-chan Event) {
|
||||
r.logger.Error("Failed to switch to consensus reactor")
|
||||
}
|
||||
r.endSync()
|
||||
r.fastSync.UnSet()
|
||||
return
|
||||
case noOpEvent:
|
||||
default:
|
||||
@@ -596,3 +615,29 @@ func (r *BlockchainReactor) GetMaxPeerBlockHeight() int64 {
|
||||
defer r.mtx.RUnlock()
|
||||
return r.maxPeerHeight
|
||||
}
|
||||
|
||||
func (r *BlockchainReactor) GetTotalSyncedTime() time.Duration {
|
||||
if !r.fastSync.IsSet() || r.syncStartTime.IsZero() {
|
||||
return time.Duration(0)
|
||||
}
|
||||
return time.Since(r.syncStartTime)
|
||||
}
|
||||
|
||||
func (r *BlockchainReactor) GetRemainingSyncTime() time.Duration {
|
||||
if !r.fastSync.IsSet() {
|
||||
return time.Duration(0)
|
||||
}
|
||||
|
||||
r.mtx.RLock()
|
||||
defer r.mtx.RUnlock()
|
||||
|
||||
targetSyncs := r.maxPeerHeight - r.syncStartHeight
|
||||
currentSyncs := r.syncHeight - r.syncStartHeight + 1
|
||||
if currentSyncs < 0 || r.lastSyncRate < 0.001 {
|
||||
return time.Duration(0)
|
||||
}
|
||||
|
||||
remain := float64(targetSyncs-currentSyncs) / r.lastSyncRate
|
||||
|
||||
return time.Duration(int64(remain * float64(time.Second)))
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
|
||||
prevoteHeight := int64(2)
|
||||
testName := "consensus_byzantine_test"
|
||||
tickerFunc := newMockTickerFunc(true)
|
||||
appFunc := newCounter
|
||||
appFunc := newKVStore
|
||||
|
||||
genDoc, privVals := factory.RandGenesisDoc(config, nValidators, false, 30)
|
||||
states := make([]*State, nValidators)
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/example/counter"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
@@ -449,7 +448,7 @@ func randState(config *cfg.Config, nValidators int) (*State, []*validatorStub) {
|
||||
|
||||
vss := make([]*validatorStub, nValidators)
|
||||
|
||||
cs := newState(state, privVals[0], counter.NewApplication(true))
|
||||
cs := newState(state, privVals[0], kvstore.NewApplication())
|
||||
|
||||
for i := 0; i < nValidators; i++ {
|
||||
vss[i] = newValidatorStub(privVals[i], int32(i))
|
||||
@@ -862,10 +861,6 @@ func (m *mockTicker) Chan() <-chan timeoutInfo {
|
||||
|
||||
func (*mockTicker) SetLogger(log.Logger) {}
|
||||
|
||||
func newCounter() abci.Application {
|
||||
return counter.NewApplication(true)
|
||||
}
|
||||
|
||||
func newPersistentKVStore() abci.Application {
|
||||
dir, err := ioutil.TempDir("", "persistent-kvstore")
|
||||
if err != nil {
|
||||
@@ -874,6 +869,10 @@ func newPersistentKVStore() abci.Application {
|
||||
return kvstore.NewPersistentKVStoreApplication(dir)
|
||||
}
|
||||
|
||||
func newKVStore() abci.Application {
|
||||
return kvstore.NewApplication()
|
||||
}
|
||||
|
||||
func newPersistentKVStoreWithPath(dbDir string) abci.Application {
|
||||
return kvstore.NewPersistentKVStoreApplication(dbDir)
|
||||
}
|
||||
|
||||
@@ -18,7 +18,9 @@ func TestReactorInvalidPrecommit(t *testing.T) {
|
||||
config := configSetup(t)
|
||||
|
||||
n := 4
|
||||
states, cleanup := randConsensusState(t, config, n, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
|
||||
states, cleanup := randConsensusState(t,
|
||||
config, n, "consensus_reactor_test",
|
||||
newMockTickerFunc(true), newKVStore)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
for i := 0; i < 4; i++ {
|
||||
|
||||
@@ -2,6 +2,7 @@ package consensus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
cstypes "github.com/tendermint/tendermint/internal/consensus/types"
|
||||
@@ -101,6 +102,14 @@ type FastSyncReactor interface {
|
||||
SwitchToFastSync(sm.State) error
|
||||
|
||||
GetMaxPeerBlockHeight() int64
|
||||
|
||||
// GetTotalSyncedTime returns the time duration since the fastsync starting.
|
||||
GetTotalSyncedTime() time.Duration
|
||||
|
||||
// GetRemainingSyncTime returns the estimating time the node will be fully synced,
|
||||
// if will return 0 if the fastsync does not perform or the number of block synced is
|
||||
// too small (less than 100).
|
||||
GetRemainingSyncTime() time.Duration
|
||||
}
|
||||
|
||||
// Reactor defines a reactor for the consensus service.
|
||||
@@ -1197,21 +1206,24 @@ func (r *Reactor) handleVoteSetBitsMessage(envelope p2p.Envelope, msgI Message)
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
//
|
||||
// NOTE: We process these messages even when we're fast_syncing. Messages affect
|
||||
// either a peer state or the consensus state. Peer state updates can happen in
|
||||
// parallel, but processing of proposals, block parts, and votes are ordered by
|
||||
// the p2p channel.
|
||||
//
|
||||
// NOTE: We block on consensus state for proposals, block parts, and votes.
|
||||
func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
r.Logger.Error("recovering from processing message panic", "err", err)
|
||||
r.Logger.Error(
|
||||
"recovering from processing message panic",
|
||||
"err", err,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
// Just skip the entire message during syncing so that we can
|
||||
// process fewer messages.
|
||||
if r.WaitSync() {
|
||||
return
|
||||
}
|
||||
|
||||
// We wrap the envelope's message in a Proto wire type so we can convert back
|
||||
// the domain type that individual channel message handlers can work with. We
|
||||
// do this here once to avoid having to do it for each individual message type.
|
||||
|
||||
@@ -257,7 +257,9 @@ func TestReactorBasic(t *testing.T) {
|
||||
config := configSetup(t)
|
||||
|
||||
n := 4
|
||||
states, cleanup := randConsensusState(t, config, n, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
|
||||
states, cleanup := randConsensusState(t,
|
||||
config, n, "consensus_reactor_test",
|
||||
newMockTickerFunc(true), newKVStore)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
rts := setup(t, n, states, 100) // buffer must be large enough to not deadlock
|
||||
@@ -287,7 +289,7 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
n := 4
|
||||
testName := "consensus_reactor_test"
|
||||
tickerFunc := newMockTickerFunc(true)
|
||||
appFunc := newCounter
|
||||
appFunc := newKVStore
|
||||
|
||||
genDoc, privVals := factory.RandGenesisDoc(config, n, false, 30)
|
||||
states := make([]*State, n)
|
||||
@@ -387,7 +389,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
|
||||
n,
|
||||
"consensus_reactor_test",
|
||||
newMockTickerFunc(true),
|
||||
newCounter,
|
||||
newKVStore,
|
||||
func(c *cfg.Config) {
|
||||
c.Consensus.CreateEmptyBlocks = false
|
||||
},
|
||||
@@ -431,7 +433,9 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
|
||||
config := configSetup(t)
|
||||
|
||||
n := 4
|
||||
states, cleanup := randConsensusState(t, config, n, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
|
||||
states, cleanup := randConsensusState(t,
|
||||
config, n, "consensus_reactor_test",
|
||||
newMockTickerFunc(true), newKVStore)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
rts := setup(t, n, states, 100) // buffer must be large enough to not deadlock
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/counter"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
cstypes "github.com/tendermint/tendermint/internal/consensus/types"
|
||||
p2pmock "github.com/tendermint/tendermint/internal/p2p/mock"
|
||||
@@ -654,7 +654,7 @@ func TestStateLockPOLRelock(t *testing.T) {
|
||||
signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)
|
||||
|
||||
// before we timeout to the new round set the new proposal
|
||||
cs2 := newState(cs1.state, vs2, counter.NewApplication(true))
|
||||
cs2 := newState(cs1.state, vs2, kvstore.NewApplication())
|
||||
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
|
||||
if prop == nil || propBlock == nil {
|
||||
t.Fatal("Failed to create proposal block with vs2")
|
||||
@@ -843,7 +843,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
|
||||
signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)
|
||||
|
||||
// before we timeout to the new round set the new proposal
|
||||
cs2 := newState(cs1.state, vs2, counter.NewApplication(true))
|
||||
cs2 := newState(cs1.state, vs2, kvstore.NewApplication())
|
||||
prop, propBlock := decideProposal(cs2, vs2, vs2.Height, vs2.Round+1)
|
||||
if prop == nil || propBlock == nil {
|
||||
t.Fatal("Failed to create proposal block with vs2")
|
||||
@@ -887,7 +887,7 @@ func TestStateLockPOLUnlockOnUnknownBlock(t *testing.T) {
|
||||
signAddVotes(config, cs1, tmproto.PrecommitType, nil, types.PartSetHeader{}, vs2, vs3, vs4)
|
||||
|
||||
// before we timeout to the new round set the new proposal
|
||||
cs3 := newState(cs1.state, vs3, counter.NewApplication(true))
|
||||
cs3 := newState(cs1.state, vs3, kvstore.NewApplication())
|
||||
prop, propBlock = decideProposal(cs3, vs3, vs3.Height, vs3.Round+1)
|
||||
if prop == nil || propBlock == nil {
|
||||
t.Fatal("Failed to create proposal block with vs2")
|
||||
|
||||
@@ -2,6 +2,7 @@ package evidence
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -165,6 +166,11 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
r.Logger.Error(
|
||||
"recovering from processing message panic",
|
||||
"err", err,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -296,7 +302,11 @@ func (r *Reactor) broadcastEvidenceLoop(peerID types.NodeID, closer *tmsync.Clos
|
||||
r.peerWG.Done()
|
||||
|
||||
if e := recover(); e != nil {
|
||||
r.Logger.Error("recovering from broadcasting evidence loop", "err", e)
|
||||
r.Logger.Error(
|
||||
"recovering from broadcasting evidence loop",
|
||||
"err", e,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -146,7 +146,7 @@ func (g *Group) OnStart() error {
|
||||
func (g *Group) OnStop() {
|
||||
g.ticker.Stop()
|
||||
if err := g.FlushAndSync(); err != nil {
|
||||
g.Logger.Error("Error flushin to disk", "err", err)
|
||||
g.Logger.Error("Error flushing to disk", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ func (g *Group) Wait() {
|
||||
// Close closes the head file. The group must be stopped by this moment.
|
||||
func (g *Group) Close() {
|
||||
if err := g.FlushAndSync(); err != nil {
|
||||
g.Logger.Error("Error flushin to disk", "err", err)
|
||||
g.Logger.Error("Error flushing to disk", "err", err)
|
||||
}
|
||||
|
||||
g.mtx.Lock()
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/counter"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
abciserver "github.com/tendermint/tendermint/abci/server"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
@@ -217,7 +216,7 @@ func TestMempoolUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
|
||||
app := counter.NewApplication(true)
|
||||
app := kvstore.NewApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
wcfg := cfg.DefaultConfig()
|
||||
wcfg.Mempool.KeepInvalidTxsInCache = true
|
||||
@@ -309,7 +308,7 @@ func TestTxsAvailable(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSerialReap(t *testing.T) {
|
||||
app := counter.NewApplication(true)
|
||||
app := kvstore.NewApplication()
|
||||
cc := proxy.NewLocalClientCreator(app)
|
||||
|
||||
mp, cleanup := newMempoolWithApp(cc)
|
||||
@@ -508,7 +507,7 @@ func TestMempoolTxsBytes(t *testing.T) {
|
||||
}
|
||||
|
||||
// 6. zero after tx is rechecked and removed due to not being valid anymore
|
||||
app2 := counter.NewApplication(true)
|
||||
app2 := kvstore.NewApplication()
|
||||
cc = proxy.NewLocalClientCreator(app2)
|
||||
mp, cleanup = newMempoolWithApp(cc)
|
||||
defer cleanup()
|
||||
@@ -540,16 +539,16 @@ func TestMempoolTxsBytes(t *testing.T) {
|
||||
// Pretend like we committed nothing so txBytes gets rechecked and removed.
|
||||
err = mp.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil)
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 0, mp.SizeBytes())
|
||||
assert.EqualValues(t, 8, mp.SizeBytes())
|
||||
|
||||
// 7. Test RemoveTxByKey function
|
||||
err = mp.CheckTx(context.Background(), []byte{0x06}, nil, mempool.TxInfo{})
|
||||
require.NoError(t, err)
|
||||
assert.EqualValues(t, 1, mp.SizeBytes())
|
||||
assert.EqualValues(t, 9, mp.SizeBytes())
|
||||
mp.RemoveTxByKey(mempool.TxKey([]byte{0x07}), true)
|
||||
assert.EqualValues(t, 1, mp.SizeBytes())
|
||||
assert.EqualValues(t, 9, mp.SizeBytes())
|
||||
mp.RemoveTxByKey(mempool.TxKey([]byte{0x06}), true)
|
||||
assert.EqualValues(t, 0, mp.SizeBytes())
|
||||
assert.EqualValues(t, 8, mp.SizeBytes())
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -188,6 +189,11 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
r.Logger.Error(
|
||||
"recovering from processing message panic",
|
||||
"err", err,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -312,7 +318,11 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
|
||||
r.peerWG.Done()
|
||||
|
||||
if e := recover(); e != nil {
|
||||
r.Logger.Error("recovering from broadcasting mempool loop", "err", e)
|
||||
r.Logger.Error(
|
||||
"recovering from broadcasting mempool loop",
|
||||
"err", e,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -187,6 +188,11 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
r.Logger.Error(
|
||||
"recovering from processing message panic",
|
||||
"err", err,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -311,7 +317,11 @@ func (r *Reactor) broadcastTxRoutine(peerID types.NodeID, closer *tmsync.Closer)
|
||||
r.peerWG.Done()
|
||||
|
||||
if e := recover(); e != nil {
|
||||
r.Logger.Error("recovering from broadcasting mempool loop", "err", e)
|
||||
r.Logger.Error(
|
||||
"recovering from broadcasting mempool loop",
|
||||
"err", e,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
34
internal/p2p/pex/doc.go
Normal file
34
internal/p2p/pex/doc.go
Normal file
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
Package PEX (Peer exchange) handles all the logic necessary for nodes to share
|
||||
information about their peers to other nodes. Specifically, this is the exchange
|
||||
of addresses that a peer can use to discover more peers within the network.
|
||||
|
||||
The PEX reactor is a continuous service which periodically requests addresses
|
||||
and serves addresses to other peers. There are two versions of this service
|
||||
aligning with the two p2p frameworks that Tendermint currently supports.
|
||||
|
||||
V1 is coupled with the Switch (which handles peer connections and routing of
|
||||
messages) and, alongside exchanging peer information in the form of port/IP
|
||||
pairs, also has the responsibility of dialing peers and ensuring that a
|
||||
node has a sufficient amount of peers connected.
|
||||
|
||||
V2 is embedded with the new p2p stack and uses the peer manager to advertise
|
||||
peers as well as add new peers to the peer store. The V2 reactor passes a
|
||||
different set of proto messages which include a list of
|
||||
[urls](https://golang.org/pkg/net/url/#URL).These can be used to save a set of
|
||||
endpoints that each peer uses. The V2 reactor has backwards compatibility with
|
||||
V1. It can also handle V1 messages.
|
||||
|
||||
The V2 reactor is able to tweak the intensity of it's search by decreasing or
|
||||
increasing the interval between each request. It tracks connected peers via a
|
||||
linked list, sending a request to the node at the front of the list and adding
|
||||
it to the back of the list once a response is received. Using this method, a
|
||||
node is able to spread out the load of requesting peers across all the peers it
|
||||
is currently connected with.
|
||||
|
||||
With each inbound set of addresses, the reactor monitors the amount of new
|
||||
addresses to already seen addresses and uses the information to dynamically
|
||||
build a picture of the size of the network in order to ascertain how often the
|
||||
node needs to search for new peers.
|
||||
*/
|
||||
package pex
|
||||
@@ -3,6 +3,7 @@ package pex
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -367,6 +368,11 @@ func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (er
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
r.Logger.Error(
|
||||
"recovering from processing message panic",
|
||||
"err", err,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -476,8 +476,10 @@ func (r *Router) routeChannel(
|
||||
}
|
||||
|
||||
if !contains {
|
||||
r.logger.Error("tried to send message across a channel that the peer doesn't have available",
|
||||
"peer", envelope.To, "channel", chID)
|
||||
// reactor tried to send a message across a channel that the
|
||||
// peer doesn't have available. This is a known issue due to
|
||||
// how peer subscriptions work:
|
||||
// https://github.com/tendermint/tendermint/issues/6598
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,8 @@ func newDispatcher(requestCh chan<- p2p.Envelope, timeout time.Duration) *dispat
|
||||
}
|
||||
}
|
||||
|
||||
// LightBlock uses the request channel to fetch a light block from the next peer
|
||||
// in a list, tracks the call and waits for the reactor to pass along the response
|
||||
func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.LightBlock, types.NodeID, error) {
|
||||
d.mtx.Lock()
|
||||
outgoingCalls := len(d.calls)
|
||||
@@ -57,11 +59,13 @@ func (d *dispatcher) LightBlock(ctx context.Context, height int64) (*types.Light
|
||||
|
||||
// fetch the next peer id in the list and request a light block from that
|
||||
// peer
|
||||
peer := d.availablePeers.Pop()
|
||||
peer := d.availablePeers.Pop(ctx)
|
||||
lb, err := d.lightBlock(ctx, height, peer)
|
||||
return lb, peer, err
|
||||
}
|
||||
|
||||
// Providers turns the dispatcher into a set of providers (per peer) which can
|
||||
// be used by a light client
|
||||
func (d *dispatcher) Providers(chainID string, timeout time.Duration) []provider.Provider {
|
||||
d.mtx.Lock()
|
||||
defer d.mtx.Unlock()
|
||||
@@ -272,7 +276,7 @@ func (l *peerlist) Len() int {
|
||||
return len(l.peers)
|
||||
}
|
||||
|
||||
func (l *peerlist) Pop() types.NodeID {
|
||||
func (l *peerlist) Pop(ctx context.Context) types.NodeID {
|
||||
l.mtx.Lock()
|
||||
if len(l.peers) == 0 {
|
||||
// if we don't have any peers in the list we block until a peer is
|
||||
@@ -281,8 +285,13 @@ func (l *peerlist) Pop() types.NodeID {
|
||||
l.waiting = append(l.waiting, wait)
|
||||
// unlock whilst waiting so that the list can be appended to
|
||||
l.mtx.Unlock()
|
||||
peer := <-wait
|
||||
return peer
|
||||
select {
|
||||
case peer := <-wait:
|
||||
return peer
|
||||
|
||||
case <-ctx.Done():
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
peer := l.peers[0]
|
||||
|
||||
@@ -95,7 +95,7 @@ func TestPeerListBasic(t *testing.T) {
|
||||
|
||||
half := numPeers / 2
|
||||
for i := 0; i < half; i++ {
|
||||
assert.Equal(t, peerSet[i], peerList.Pop())
|
||||
assert.Equal(t, peerSet[i], peerList.Pop(ctx))
|
||||
}
|
||||
assert.Equal(t, half, peerList.Len())
|
||||
|
||||
@@ -104,7 +104,7 @@ func TestPeerListBasic(t *testing.T) {
|
||||
|
||||
peerList.Remove(peerSet[half])
|
||||
half++
|
||||
assert.Equal(t, peerSet[half], peerList.Pop())
|
||||
assert.Equal(t, peerSet[half], peerList.Pop(ctx))
|
||||
|
||||
}
|
||||
|
||||
@@ -117,7 +117,7 @@ func TestPeerListConcurrent(t *testing.T) {
|
||||
// peer list hasn't been populated each these go routines should block
|
||||
for i := 0; i < numPeers/2; i++ {
|
||||
go func() {
|
||||
_ = peerList.Pop()
|
||||
_ = peerList.Pop(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
@@ -132,7 +132,7 @@ func TestPeerListConcurrent(t *testing.T) {
|
||||
// we request the second half of the peer set
|
||||
for i := 0; i < numPeers/2; i++ {
|
||||
go func() {
|
||||
_ = peerList.Pop()
|
||||
_ = peerList.Pop(ctx)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
@@ -310,12 +311,17 @@ func (r *Reactor) backfill(
|
||||
// waiting on blocks. If it takes 4s to retrieve a block and 1s to verify
|
||||
// it, then steady state involves four workers.
|
||||
for i := 0; i < int(r.cfg.Fetchers); i++ {
|
||||
ctxWithCancel, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case height := <-queue.nextHeight():
|
||||
r.Logger.Debug("fetching next block", "height", height)
|
||||
lb, peer, err := r.dispatcher.LightBlock(ctx, height)
|
||||
lb, peer, err := r.dispatcher.LightBlock(ctxWithCancel, height)
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
queue.retry(height)
|
||||
if errors.Is(err, errNoConnectedPeers) {
|
||||
@@ -332,8 +338,8 @@ func (r *Reactor) backfill(
|
||||
if lb == nil {
|
||||
r.Logger.Info("backfill: peer didn't have block, fetching from another peer", "height", height)
|
||||
queue.retry(height)
|
||||
// as we are fetching blocks backwards, if this node doesn't have the block it likely doesn't
|
||||
// have any prior ones, thus we remove it from the peer list
|
||||
// As we are fetching blocks backwards, if this node doesn't have the block it likely doesn't
|
||||
// have any prior ones, thus we remove it from the peer list.
|
||||
r.dispatcher.removePeer(peer)
|
||||
continue
|
||||
}
|
||||
@@ -622,7 +628,6 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
|
||||
case *ssproto.LightBlockResponse:
|
||||
if err := r.dispatcher.respond(msg.LightBlock, envelope.From); err != nil {
|
||||
r.Logger.Error("error processing light block response", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
@@ -639,6 +644,11 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
r.Logger.Error(
|
||||
"recovering from processing message panic",
|
||||
"err", err,
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -3,12 +3,11 @@ package statesync
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
// "github.com/fortytw2/leaktest"
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
@@ -421,11 +420,11 @@ func TestReactor_Dispatcher(t *testing.T) {
|
||||
|
||||
func TestReactor_Backfill(t *testing.T) {
|
||||
// test backfill algorithm with varying failure rates [0, 10]
|
||||
failureRates := []int{0, 3, 9}
|
||||
failureRates := []int{0, 2, 9}
|
||||
for _, failureRate := range failureRates {
|
||||
failureRate := failureRate
|
||||
t.Run(fmt.Sprintf("failure rate: %d", failureRate), func(t *testing.T) {
|
||||
// t.Cleanup(leaktest.Check(t))
|
||||
t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
|
||||
rts := setup(t, nil, nil, nil, 21)
|
||||
|
||||
var (
|
||||
@@ -467,7 +466,7 @@ func TestReactor_Backfill(t *testing.T) {
|
||||
factory.MakeBlockIDWithHash(chain[startHeight].Header.Hash()),
|
||||
stopTime,
|
||||
)
|
||||
if failureRate > 5 {
|
||||
if failureRate > 3 {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
@@ -506,6 +505,7 @@ func handleLightBlockRequests(t *testing.T,
|
||||
close chan struct{},
|
||||
failureRate int) {
|
||||
requests := 0
|
||||
errorCount := 0
|
||||
for {
|
||||
select {
|
||||
case envelope := <-receiving:
|
||||
@@ -520,7 +520,7 @@ func handleLightBlockRequests(t *testing.T,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
switch rand.Intn(3) {
|
||||
switch errorCount % 3 {
|
||||
case 0: // send a different block
|
||||
differntLB, err := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID()).ToProto()
|
||||
require.NoError(t, err)
|
||||
@@ -539,6 +539,7 @@ func handleLightBlockRequests(t *testing.T,
|
||||
}
|
||||
case 2: // don't do anything
|
||||
}
|
||||
errorCount++
|
||||
}
|
||||
}
|
||||
case <-close:
|
||||
|
||||
@@ -38,12 +38,16 @@ var (
|
||||
errRejectFormat = errors.New("snapshot format was rejected")
|
||||
// errRejectSender is returned by Sync() when the snapshot sender is rejected.
|
||||
errRejectSender = errors.New("snapshot sender was rejected")
|
||||
// errVerifyFailed is returned by Sync() when app hash or last height verification fails.
|
||||
// errVerifyFailed is returned by Sync() when app hash or last height
|
||||
// verification fails.
|
||||
errVerifyFailed = errors.New("verification failed")
|
||||
// errTimeout is returned by Sync() when we've waited too long to receive a chunk.
|
||||
errTimeout = errors.New("timed out waiting for chunk")
|
||||
// errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
|
||||
errNoSnapshots = errors.New("no suitable snapshots found")
|
||||
// errStateCommitTimeout is returned by Sync() when the timeout for retrieving
|
||||
// tendermint state or the commit is exceeded
|
||||
errStateCommitTimeout = errors.New("timed out trying to retrieve state and commit")
|
||||
)
|
||||
|
||||
// syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
|
||||
@@ -226,6 +230,10 @@ func (s *syncer) SyncAny(
|
||||
s.logger.Info("Snapshot sender rejected", "peer", peer)
|
||||
}
|
||||
|
||||
case errors.Is(err, errStateCommitTimeout):
|
||||
s.logger.Info("Timed out retrieving state and commit, rejecting and retrying...", "height", snapshot.Height)
|
||||
s.snapshots.Reject(snapshot)
|
||||
|
||||
default:
|
||||
return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
|
||||
}
|
||||
@@ -275,10 +283,20 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
|
||||
// Optimistically build new state, so we don't discover any light client failures at the end.
|
||||
state, err := s.stateProvider.State(pctx, snapshot.Height)
|
||||
if err != nil {
|
||||
// check if the provider context exceeded the 10 second deadline
|
||||
if err == context.DeadlineExceeded && ctx.Err() == nil {
|
||||
return sm.State{}, nil, errStateCommitTimeout
|
||||
}
|
||||
|
||||
return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
|
||||
}
|
||||
commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
|
||||
if err != nil {
|
||||
// check if the provider context exceeded the 10 second deadline
|
||||
if err == context.DeadlineExceeded && ctx.Err() == nil {
|
||||
return sm.State{}, nil, errStateCommitTimeout
|
||||
}
|
||||
|
||||
return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
|
||||
}
|
||||
|
||||
|
||||
33
libs/sync/atomic_bool.go
Normal file
33
libs/sync/atomic_bool.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package sync
|
||||
|
||||
import "sync/atomic"
|
||||
|
||||
// AtomicBool is an atomic Boolean.
|
||||
// Its methods are all atomic, thus safe to be called by multiple goroutines simultaneously.
|
||||
// Note: When embedding into a struct one should always use *AtomicBool to avoid copy.
|
||||
// it's a simple implmentation from https://github.com/tevino/abool
|
||||
type AtomicBool int32
|
||||
|
||||
// NewBool creates an AtomicBool with given default value.
|
||||
func NewBool(ok bool) *AtomicBool {
|
||||
ab := new(AtomicBool)
|
||||
if ok {
|
||||
ab.Set()
|
||||
}
|
||||
return ab
|
||||
}
|
||||
|
||||
// Set sets the Boolean to true.
|
||||
func (ab *AtomicBool) Set() {
|
||||
atomic.StoreInt32((*int32)(ab), 1)
|
||||
}
|
||||
|
||||
// UnSet sets the Boolean to false.
|
||||
func (ab *AtomicBool) UnSet() {
|
||||
atomic.StoreInt32((*int32)(ab), 0)
|
||||
}
|
||||
|
||||
// IsSet returns whether the Boolean is true.
|
||||
func (ab *AtomicBool) IsSet() bool {
|
||||
return atomic.LoadInt32((*int32)(ab))&1 == 1
|
||||
}
|
||||
27
libs/sync/atomic_bool_test.go
Normal file
27
libs/sync/atomic_bool_test.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestDefaultValue(t *testing.T) {
|
||||
t.Parallel()
|
||||
v := NewBool(false)
|
||||
assert.False(t, v.IsSet())
|
||||
|
||||
v = NewBool(true)
|
||||
assert.True(t, v.IsSet())
|
||||
}
|
||||
|
||||
func TestSetUnSet(t *testing.T) {
|
||||
t.Parallel()
|
||||
v := NewBool(false)
|
||||
|
||||
v.Set()
|
||||
assert.True(t, v.IsSet())
|
||||
|
||||
v.UnSet()
|
||||
assert.False(t, v.IsSet())
|
||||
}
|
||||
@@ -909,6 +909,10 @@ func (c *Client) lightBlockFromPrimary(ctx context.Context, height int64) (*type
|
||||
// Everything went smoothly. We reset the lightBlockRequests and return the light block
|
||||
return l, nil
|
||||
|
||||
// catch canceled contexts or deadlines
|
||||
case context.Canceled, context.DeadlineExceeded:
|
||||
return nil, err
|
||||
|
||||
case provider.ErrNoResponse, provider.ErrLightBlockNotFound, provider.ErrHeightTooHigh:
|
||||
// we find a new witness to replace the primary
|
||||
c.logger.Debug("error from light block request from primary, replacing...",
|
||||
@@ -1011,6 +1015,10 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
|
||||
// return the light block that new primary responded with
|
||||
return response.lb, nil
|
||||
|
||||
// catch canceled contexts or deadlines
|
||||
case context.Canceled, context.DeadlineExceeded:
|
||||
return nil, response.err
|
||||
|
||||
// process benign errors by logging them only
|
||||
case provider.ErrNoResponse, provider.ErrLightBlockNotFound, provider.ErrHeightTooHigh:
|
||||
lastError = response.err
|
||||
@@ -1067,7 +1075,13 @@ and remove witness. Otherwise, use a different primary`, e.WitnessIndex), "witne
|
||||
"witness", c.witnesses[e.WitnessIndex],
|
||||
"err", err)
|
||||
witnessesToRemove = append(witnessesToRemove, e.WitnessIndex)
|
||||
default: // the witness either didn't respond or didn't have the block. We ignore it.
|
||||
default:
|
||||
// check for canceled contexts or deadlines
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
}
|
||||
|
||||
// the witness either didn't respond or didn't have the block. We ignore it.
|
||||
c.logger.Debug("unable to compare first header with witness",
|
||||
"err", err)
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
|
||||
var defaultOptions = Options{
|
||||
MaxRetryAttempts: 5,
|
||||
Timeout: 3 * time.Second,
|
||||
Timeout: 5 * time.Second,
|
||||
NoBlockThreshold: 5,
|
||||
NoResponseThreshold: 5,
|
||||
}
|
||||
@@ -125,7 +125,7 @@ func (p *http) LightBlock(ctx context.Context, height int64) (*types.LightBlock,
|
||||
|
||||
if sh.Header == nil {
|
||||
return nil, provider.ErrBadLightBlock{
|
||||
Reason: errors.New("header is nil unexpectedly"),
|
||||
Reason: errors.New("returned header is nil unexpectedly"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,6 +205,11 @@ func (p *http) validatorSet(ctx context.Context, height *int64) (*types.Validato
|
||||
return nil, p.parseRPCError(e)
|
||||
|
||||
default:
|
||||
// check if the error stems from the context
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we don't know the error then by default we return an unreliable provider error and
|
||||
// terminate the connection with the peer.
|
||||
return nil, provider.ErrUnreliableProvider{Reason: e.Error()}
|
||||
@@ -236,11 +241,19 @@ func (p *http) signedHeader(ctx context.Context, height *int64) (*types.SignedHe
|
||||
return &commit.SignedHeader, nil
|
||||
|
||||
case *url.Error:
|
||||
// check if the request timed out
|
||||
if e.Timeout() {
|
||||
// we wait and try again with exponential backoff
|
||||
time.Sleep(backoffTimeout(attempt))
|
||||
continue
|
||||
}
|
||||
|
||||
// check if the connection was refused or dropped
|
||||
if strings.Contains(e.Error(), "connection refused") {
|
||||
return nil, provider.ErrConnectionClosed
|
||||
}
|
||||
|
||||
// else, as a catch all, we return the error as a bad light block response
|
||||
return nil, provider.ErrBadLightBlock{Reason: e}
|
||||
|
||||
case *rpctypes.RPCError:
|
||||
@@ -248,6 +261,11 @@ func (p *http) signedHeader(ctx context.Context, height *int64) (*types.SignedHe
|
||||
return nil, p.parseRPCError(e)
|
||||
|
||||
default:
|
||||
// check if the error stems from the context
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If we don't know the error then by default we return an unreliable provider error and
|
||||
// terminate the connection with the peer.
|
||||
return nil, provider.ErrUnreliableProvider{Reason: e.Error()}
|
||||
|
||||
@@ -4,13 +4,12 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/light/provider"
|
||||
lighthttp "github.com/tendermint/tendermint/light/provider/http"
|
||||
rpcclient "github.com/tendermint/tendermint/rpc/client"
|
||||
@@ -33,30 +32,17 @@ func TestNewProvider(t *testing.T) {
|
||||
require.Equal(t, fmt.Sprintf("%s", c), "http{http://153.200.0.1}")
|
||||
}
|
||||
|
||||
// NodeSuite initiates and runs a full node instance in the
|
||||
// background, stopping it once the test is completed
|
||||
func NodeSuite(t *testing.T) (service.Service, *config.Config) {
|
||||
t.Helper()
|
||||
|
||||
func TestProvider(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
conf := rpctest.CreateConfig(t.Name())
|
||||
defer cancel()
|
||||
cfg := rpctest.CreateConfig(t.Name())
|
||||
|
||||
// start a tendermint node in the background to test against
|
||||
app := kvstore.NewApplication()
|
||||
app.RetainBlocks = 9
|
||||
|
||||
node, closer, err := rpctest.StartTendermint(ctx, conf, app)
|
||||
_, closer, err := rpctest.StartTendermint(ctx, cfg, app)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
_ = closer(ctx)
|
||||
cancel()
|
||||
})
|
||||
return node, conf
|
||||
}
|
||||
|
||||
func TestProvider(t *testing.T) {
|
||||
_, cfg := NodeSuite(t)
|
||||
rpcAddr := cfg.RPC.ListenAddress
|
||||
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
@@ -95,8 +81,9 @@ func TestProvider(t *testing.T) {
|
||||
require.Nil(t, lb)
|
||||
assert.Equal(t, provider.ErrHeightTooHigh, err)
|
||||
|
||||
_, err = p.LightBlock(context.Background(), 1)
|
||||
lb, err = p.LightBlock(context.Background(), 1)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, lb)
|
||||
assert.Equal(t, provider.ErrLightBlockNotFound, err)
|
||||
|
||||
// if the provider is unable to provide four more blocks then we should return
|
||||
@@ -105,4 +92,15 @@ func TestProvider(t *testing.T) {
|
||||
_, err = p.LightBlock(context.Background(), 1)
|
||||
}
|
||||
assert.IsType(t, provider.ErrUnreliableProvider{}, err)
|
||||
|
||||
// shut down tendermint node
|
||||
require.NoError(t, closer(ctx))
|
||||
cancel()
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
lb, err = p.LightBlock(context.Background(), lower+2)
|
||||
// we should see a connection refused
|
||||
require.Error(t, err)
|
||||
require.Nil(t, lb)
|
||||
assert.Equal(t, provider.ErrConnectionClosed, err)
|
||||
}
|
||||
|
||||
@@ -81,13 +81,14 @@ func createAndStartIndexerService(
|
||||
|
||||
eventSinks := []indexer.EventSink{}
|
||||
|
||||
// Check duplicated sinks.
|
||||
// check for duplicated sinks
|
||||
sinks := map[string]bool{}
|
||||
for _, s := range config.TxIndex.Indexer {
|
||||
sl := strings.ToLower(s)
|
||||
if sinks[sl] {
|
||||
return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
}
|
||||
|
||||
sinks[sl] = true
|
||||
}
|
||||
|
||||
@@ -95,25 +96,31 @@ loop:
|
||||
for k := range sinks {
|
||||
switch k {
|
||||
case string(indexer.NULL):
|
||||
// when we see null in the config, the eventsinks will be reset with the nullEventSink.
|
||||
// When we see null in the config, the eventsinks will be reset with the
|
||||
// nullEventSink.
|
||||
eventSinks = []indexer.EventSink{null.NewEventSink()}
|
||||
break loop
|
||||
|
||||
case string(indexer.KV):
|
||||
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
eventSinks = append(eventSinks, kv.NewEventSink(store))
|
||||
|
||||
case string(indexer.PSQL):
|
||||
conn := config.TxIndex.PsqlConn
|
||||
if conn == "" {
|
||||
return nil, nil, errors.New("the psql connection settings cannot be empty")
|
||||
}
|
||||
|
||||
es, _, err := psql.NewEventSink(conn, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eventSinks = append(eventSinks, es)
|
||||
|
||||
default:
|
||||
return nil, nil, errors.New("unsupported event sink type")
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"io"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/example/counter"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
||||
@@ -68,17 +67,13 @@ func (r *remoteClientCreator) NewABCIClient() (abcicli.Client, error) {
|
||||
}
|
||||
|
||||
// DefaultClientCreator returns a default ClientCreator, which will create a
|
||||
// local client if addr is one of: 'counter', 'counter_serial', 'kvstore',
|
||||
// local client if addr is one of: 'kvstore',
|
||||
// 'persistent_kvstore' or 'noop', otherwise - a remote client.
|
||||
//
|
||||
// The Closer is a noop except for persistent_kvstore applications,
|
||||
// which will clean up the store.
|
||||
func DefaultClientCreator(addr, transport, dbDir string) (ClientCreator, io.Closer) {
|
||||
switch addr {
|
||||
case "counter":
|
||||
return NewLocalClientCreator(counter.NewApplication(false)), noopCloser{}
|
||||
case "counter_serial":
|
||||
return NewLocalClientCreator(counter.NewApplication(true)), noopCloser{}
|
||||
case "kvstore":
|
||||
return NewLocalClientCreator(kvstore.NewApplication()), noopCloser{}
|
||||
case "persistent_kvstore":
|
||||
|
||||
@@ -120,7 +120,7 @@ func TestBroadcastEvidence_DuplicateVoteEvidence(t *testing.T) {
|
||||
// previous versions of this test used a shared fixture with
|
||||
// other tests, and in this version we give it a little time
|
||||
// for the node to make progress before running the test
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
chainID := config.ChainID()
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package mock_test
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -23,6 +24,8 @@ func TestStatus(t *testing.T) {
|
||||
LatestAppHash: bytes.HexBytes("app"),
|
||||
LatestBlockHeight: 10,
|
||||
MaxPeerBlockHeight: 20,
|
||||
TotalSyncedTime: time.Second,
|
||||
RemainingTime: time.Minute,
|
||||
},
|
||||
}},
|
||||
}
|
||||
@@ -36,6 +39,8 @@ func TestStatus(t *testing.T) {
|
||||
assert.EqualValues("block", status.SyncInfo.LatestBlockHash)
|
||||
assert.EqualValues(10, status.SyncInfo.LatestBlockHeight)
|
||||
assert.EqualValues(20, status.SyncInfo.MaxPeerBlockHeight)
|
||||
assert.EqualValues(time.Second, status.SyncInfo.TotalSyncedTime)
|
||||
assert.EqualValues(time.Minute, status.SyncInfo.RemainingTime)
|
||||
|
||||
// make sure recorder works properly
|
||||
require.Equal(1, len(r.Calls))
|
||||
@@ -49,4 +54,6 @@ func TestStatus(t *testing.T) {
|
||||
assert.EqualValues("block", st.SyncInfo.LatestBlockHash)
|
||||
assert.EqualValues(10, st.SyncInfo.LatestBlockHeight)
|
||||
assert.EqualValues(20, st.SyncInfo.MaxPeerBlockHeight)
|
||||
assert.EqualValues(time.Second, status.SyncInfo.TotalSyncedTime)
|
||||
assert.EqualValues(time.Minute, status.SyncInfo.RemainingTime)
|
||||
}
|
||||
|
||||
@@ -71,6 +71,8 @@ func (env *Environment) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, err
|
||||
EarliestBlockTime: time.Unix(0, earliestBlockTimeNano),
|
||||
MaxPeerBlockHeight: env.FastSyncReactor.GetMaxPeerBlockHeight(),
|
||||
CatchingUp: env.ConsensusReactor.WaitSync(),
|
||||
TotalSyncedTime: env.FastSyncReactor.GetTotalSyncedTime(),
|
||||
RemainingTime: env.FastSyncReactor.GetRemainingSyncTime(),
|
||||
},
|
||||
ValidatorInfo: validatorInfo,
|
||||
}
|
||||
|
||||
@@ -98,6 +98,9 @@ type SyncInfo struct {
|
||||
MaxPeerBlockHeight int64 `json:"max_peer_block_height"`
|
||||
|
||||
CatchingUp bool `json:"catching_up"`
|
||||
|
||||
TotalSyncedTime time.Duration `json:"total_synced_time"`
|
||||
RemainingTime time.Duration `json:"remaining_time"`
|
||||
}
|
||||
|
||||
// Info about the node's validator
|
||||
|
||||
@@ -1330,6 +1330,12 @@ components:
|
||||
catching_up:
|
||||
type: boolean
|
||||
example: false
|
||||
total_synced_time:
|
||||
type: string
|
||||
example: "1000000000"
|
||||
remaining_time:
|
||||
type: string
|
||||
example: "0"
|
||||
ValidatorInfo:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/google/orderedcode"
|
||||
"github.com/pkg/errors"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
)
|
||||
|
||||
@@ -395,7 +394,7 @@ func Migrate(ctx context.Context, db dbm.DB) error {
|
||||
|
||||
// check the error results
|
||||
if len(errs) != 0 {
|
||||
return errors.Errorf("encountered errors during migration: %v", errStrs)
|
||||
return fmt.Errorf("encountered errors during migration: %v", errStrs)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -110,6 +110,7 @@ func (es *EventSink) IndexTxEvents(txr []*abci.TxResult) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if !r.Next() {
|
||||
return nil
|
||||
|
||||
@@ -293,6 +293,7 @@ func verifyBlock(h int64) (bool, error) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return false, nil
|
||||
@@ -308,6 +309,7 @@ func verifyBlock(h int64) (bool, error) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
return rows.Next(), nil
|
||||
}
|
||||
|
||||
@@ -25,9 +25,7 @@ CREATE TABLE tx_events (
|
||||
created_at TIMESTAMPTZ NOT NULL,
|
||||
chain_id VARCHAR NOT NULL,
|
||||
UNIQUE (hash, key),
|
||||
FOREIGN KEY (tx_result_id)
|
||||
REFERENCES tx_results(id)
|
||||
ON DELETE CASCADE
|
||||
FOREIGN KEY (tx_result_id) REFERENCES tx_results(id) ON DELETE CASCADE
|
||||
);
|
||||
CREATE INDEX idx_block_events_key_value ON block_events(key, value);
|
||||
CREATE INDEX idx_tx_events_key_value ON tx_events(key, value);
|
||||
|
||||
@@ -10,8 +10,6 @@ and run the following tests in docker containers:
|
||||
- includes test coverage
|
||||
- app tests
|
||||
- kvstore app over socket
|
||||
- counter app over socket
|
||||
- counter app over grpc
|
||||
- persistence tests
|
||||
- crash tendermint at each of many predefined points, restart, and ensure it syncs properly with the app
|
||||
|
||||
|
||||
@@ -2,9 +2,6 @@
|
||||
set -ex
|
||||
|
||||
#- kvstore over socket, curl
|
||||
#- counter over socket, curl
|
||||
#- counter over grpc, curl
|
||||
#- counter over grpc, grpc
|
||||
|
||||
# TODO: install everything
|
||||
|
||||
@@ -45,57 +42,6 @@ function kvstore_over_socket_reorder(){
|
||||
kill -9 $pid_kvstore $pid_tendermint
|
||||
}
|
||||
|
||||
|
||||
function counter_over_socket() {
|
||||
rm -rf $TMHOME
|
||||
tendermint init validator
|
||||
echo "Starting counter_over_socket"
|
||||
abci-cli counter --serial > /dev/null &
|
||||
pid_counter=$!
|
||||
tendermint start --mode validator > tendermint.log &
|
||||
pid_tendermint=$!
|
||||
sleep 5
|
||||
|
||||
echo "running test"
|
||||
bash test/app/counter_test.sh "Counter over Socket"
|
||||
|
||||
kill -9 $pid_counter $pid_tendermint
|
||||
}
|
||||
|
||||
function counter_over_grpc() {
|
||||
rm -rf $TMHOME
|
||||
tendermint init validator
|
||||
echo "Starting counter_over_grpc"
|
||||
abci-cli counter --serial --abci grpc > /dev/null &
|
||||
pid_counter=$!
|
||||
tendermint start --mode validator --abci grpc > tendermint.log &
|
||||
pid_tendermint=$!
|
||||
sleep 5
|
||||
|
||||
echo "running test"
|
||||
bash test/app/counter_test.sh "Counter over GRPC"
|
||||
|
||||
kill -9 $pid_counter $pid_tendermint
|
||||
}
|
||||
|
||||
function counter_over_grpc_grpc() {
|
||||
rm -rf $TMHOME
|
||||
tendermint init validator
|
||||
echo "Starting counter_over_grpc_grpc (ie. with grpc broadcast_tx)"
|
||||
abci-cli counter --serial --abci grpc > /dev/null &
|
||||
pid_counter=$!
|
||||
sleep 1
|
||||
GRPC_PORT=36656
|
||||
tendermint start --mode validator --abci grpc --rpc.grpc-laddr tcp://localhost:$GRPC_PORT > tendermint.log &
|
||||
pid_tendermint=$!
|
||||
sleep 5
|
||||
|
||||
echo "running test"
|
||||
GRPC_BROADCAST_TX=true bash test/app/counter_test.sh "Counter over GRPC via GRPC BroadcastTx"
|
||||
|
||||
kill -9 $pid_counter $pid_tendermint
|
||||
}
|
||||
|
||||
case "$1" in
|
||||
"kvstore_over_socket")
|
||||
kvstore_over_socket
|
||||
@@ -103,25 +49,10 @@ case "$1" in
|
||||
"kvstore_over_socket_reorder")
|
||||
kvstore_over_socket_reorder
|
||||
;;
|
||||
"counter_over_socket")
|
||||
counter_over_socket
|
||||
;;
|
||||
"counter_over_grpc")
|
||||
counter_over_grpc
|
||||
;;
|
||||
"counter_over_grpc_grpc")
|
||||
counter_over_grpc_grpc
|
||||
;;
|
||||
*)
|
||||
echo "Running all"
|
||||
kvstore_over_socket
|
||||
echo ""
|
||||
kvstore_over_socket_reorder
|
||||
echo ""
|
||||
counter_over_socket
|
||||
echo ""
|
||||
counter_over_grpc
|
||||
echo ""
|
||||
counter_over_grpc_grpc
|
||||
esac
|
||||
|
||||
|
||||
@@ -14,9 +14,9 @@ var (
|
||||
// testnetCombinations defines global testnet options, where we generate a
|
||||
// separate testnet for each combination (Cartesian product) of options.
|
||||
testnetCombinations = map[string][]interface{}{
|
||||
"topology": {"single", "quad", "large"},
|
||||
"ipv6": {false, true},
|
||||
"p2p": {NewP2PMode, LegacyP2PMode, HybridP2PMode},
|
||||
"topology": {"quad", "large"},
|
||||
"ipv6": {false},
|
||||
"p2p": {NewP2PMode},
|
||||
"queueType": {"priority"}, // "fifo", "wdrr"
|
||||
"initialHeight": {0, 1000},
|
||||
"initialState": {
|
||||
@@ -24,14 +24,13 @@ var (
|
||||
map[string]string{"initial01": "a", "initial02": "b", "initial03": "c"},
|
||||
},
|
||||
"validators": {"genesis", "initchain"},
|
||||
"keyType": {types.ABCIPubKeyTypeEd25519, types.ABCIPubKeyTypeSecp256k1},
|
||||
"keyType": {types.ABCIPubKeyTypeEd25519},
|
||||
}
|
||||
|
||||
// The following specify randomly chosen values for testnet nodes.
|
||||
nodeDatabases = uniformChoice{"goleveldb", "cleveldb", "rocksdb", "boltdb", "badgerdb"}
|
||||
// FIXME: grpc disabled due to https://github.com/tendermint/tendermint/issues/5439
|
||||
nodeABCIProtocols = uniformChoice{"unix", "tcp", "builtin"} // "grpc"
|
||||
nodePrivvalProtocols = uniformChoice{"file", "unix", "tcp", "grpc"}
|
||||
nodeDatabases = uniformChoice{"badgerdb"}
|
||||
nodeABCIProtocols = uniformChoice{"builtin"}
|
||||
nodePrivvalProtocols = uniformChoice{"file"}
|
||||
// FIXME: v2 disabled due to flake
|
||||
nodeFastSyncs = uniformChoice{"v0"} // "v2"
|
||||
nodeStateSyncs = uniformChoice{false, true}
|
||||
@@ -45,6 +44,7 @@ var (
|
||||
"restart": 0.1,
|
||||
}
|
||||
evidence = uniformChoice{0, 1, 10}
|
||||
txSize = uniformChoice{1024, 10240} // either 1kb or 10kb
|
||||
)
|
||||
|
||||
// Generate generates random testnets using the given RNG.
|
||||
@@ -93,6 +93,7 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
|
||||
KeyType: opt["keyType"].(string),
|
||||
Evidence: evidence.Choose(r).(int),
|
||||
QueueType: opt["queueType"].(string),
|
||||
TxSize: int64(txSize.Choose(r).(int)),
|
||||
}
|
||||
|
||||
var p2pNodeFactor int
|
||||
@@ -128,7 +129,6 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
|
||||
// First we generate seed nodes, starting at the initial height.
|
||||
for i := 1; i <= numSeeds; i++ {
|
||||
node := generateNode(r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
|
||||
node.QueueType = manifest.QueueType
|
||||
|
||||
if p2pNodeFactor == 0 {
|
||||
node.DisableLegacyP2P = manifest.DisableLegacyP2P
|
||||
@@ -154,7 +154,6 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
|
||||
node := generateNode(
|
||||
r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2)
|
||||
|
||||
node.QueueType = manifest.QueueType
|
||||
if p2pNodeFactor == 0 {
|
||||
node.DisableLegacyP2P = manifest.DisableLegacyP2P
|
||||
} else if p2pNodeFactor%i == 0 {
|
||||
@@ -190,7 +189,7 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
|
||||
nextStartAt += 5
|
||||
}
|
||||
node := generateNode(r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
|
||||
node.QueueType = manifest.QueueType
|
||||
|
||||
if p2pNodeFactor == 0 {
|
||||
node.DisableLegacyP2P = manifest.DisableLegacyP2P
|
||||
} else if p2pNodeFactor%i == 0 {
|
||||
|
||||
@@ -52,8 +52,7 @@ seeds = ["seed02"]
|
||||
[node.validator03]
|
||||
database = "badgerdb"
|
||||
seeds = ["seed01"]
|
||||
# FIXME: should be grpc, disabled due to https://github.com/tendermint/tendermint/issues/5439
|
||||
#abci_protocol = "grpc"
|
||||
abci_protocol = "grpc"
|
||||
persist_interval = 3
|
||||
perturb = ["kill"]
|
||||
privval_protocol = "grpc"
|
||||
@@ -70,8 +69,7 @@ database = "cleveldb"
|
||||
fast_sync = "v0"
|
||||
seeds = ["seed02"]
|
||||
start_at = 1005 # Becomes part of the validator set at 1010
|
||||
# FIXME: should be grpc, disabled due to https://github.com/tendermint/tendermint/issues/5439
|
||||
#abci_protocol = "grpc"
|
||||
abci_protocol = "grpc"
|
||||
perturb = ["kill", "pause", "disconnect", "restart"]
|
||||
privval_protocol = "tcp"
|
||||
|
||||
|
||||
@@ -64,6 +64,9 @@ type Manifest struct {
|
||||
|
||||
// QueueType describes the type of queue that the system uses internally
|
||||
QueueType string `toml:"queue_type"`
|
||||
|
||||
// Number of bytes per tx. Default is 1kb (1024)
|
||||
TxSize int64
|
||||
}
|
||||
|
||||
// ManifestNode represents a node in a testnet manifest.
|
||||
@@ -143,10 +146,6 @@ type ManifestNode struct {
|
||||
|
||||
// UseNewP2P enables use of the new p2p layer for this node.
|
||||
DisableLegacyP2P bool `toml:"disable_legacy_p2p"`
|
||||
|
||||
// QueueType describes the type of queue that the p2p layer
|
||||
// uses internally.
|
||||
QueueType string `toml:"queue_type"`
|
||||
}
|
||||
|
||||
// Save saves the testnet manifest to a file.
|
||||
|
||||
@@ -66,6 +66,7 @@ type Testnet struct {
|
||||
KeyType string
|
||||
Evidence int
|
||||
LogLevel string
|
||||
TxSize int64
|
||||
}
|
||||
|
||||
// Node represents a Tendermint node in a testnet.
|
||||
@@ -133,10 +134,14 @@ func LoadTestnet(file string) (*Testnet, error) {
|
||||
Evidence: manifest.Evidence,
|
||||
KeyType: "ed25519",
|
||||
LogLevel: manifest.LogLevel,
|
||||
TxSize: manifest.TxSize,
|
||||
}
|
||||
if len(manifest.KeyType) != 0 {
|
||||
testnet.KeyType = manifest.KeyType
|
||||
}
|
||||
if testnet.TxSize <= 0 {
|
||||
testnet.TxSize = 1024
|
||||
}
|
||||
if manifest.InitialHeight > 0 {
|
||||
testnet.InitialHeight = manifest.InitialHeight
|
||||
}
|
||||
@@ -169,8 +174,8 @@ func LoadTestnet(file string) (*Testnet, error) {
|
||||
RetainBlocks: nodeManifest.RetainBlocks,
|
||||
Perturbations: []Perturbation{},
|
||||
LogLevel: manifest.LogLevel,
|
||||
DisableLegacyP2P: manifest.DisableLegacyP2P,
|
||||
QueueType: manifest.QueueType,
|
||||
DisableLegacyP2P: manifest.DisableLegacyP2P || nodeManifest.DisableLegacyP2P,
|
||||
}
|
||||
|
||||
if node.StartAt == testnet.InitialHeight {
|
||||
@@ -320,6 +325,11 @@ func (n Node) Validate(testnet Testnet) error {
|
||||
default:
|
||||
return fmt.Errorf("invalid fast sync setting %q", n.FastSync)
|
||||
}
|
||||
switch n.QueueType {
|
||||
case "", "priority", "wdrr", "fifo":
|
||||
default:
|
||||
return fmt.Errorf("unsupported p2p queue type: %s", n.QueueType)
|
||||
}
|
||||
switch n.Database {
|
||||
case "goleveldb", "cleveldb", "boltdb", "rocksdb", "badgerdb":
|
||||
default:
|
||||
|
||||
@@ -33,14 +33,14 @@ func execVerbose(args ...string) error {
|
||||
// execCompose runs a Docker Compose command for a testnet.
|
||||
func execCompose(dir string, args ...string) error {
|
||||
return exec(append(
|
||||
[]string{"docker-compose", "-f", filepath.Join(dir, "docker-compose.yml")},
|
||||
[]string{"docker-compose", "--ansi=never", "-f", filepath.Join(dir, "docker-compose.yml")},
|
||||
args...)...)
|
||||
}
|
||||
|
||||
// execComposeVerbose runs a Docker Compose command for a testnet and displays its output.
|
||||
func execComposeVerbose(dir string, args ...string) error {
|
||||
return execVerbose(append(
|
||||
[]string{"docker-compose", "-f", filepath.Join(dir, "docker-compose.yml")},
|
||||
[]string{"docker-compose", "--ansi=never", "-f", filepath.Join(dir, "docker-compose.yml")},
|
||||
args...)...)
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
|
||||
logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency))
|
||||
started := time.Now()
|
||||
|
||||
go loadGenerate(ctx, chTx, multiplier)
|
||||
go loadGenerate(ctx, chTx, multiplier, testnet.TxSize)
|
||||
|
||||
for w := 0; w < concurrency; w++ {
|
||||
go loadProcess(ctx, testnet, chTx, chSuccess)
|
||||
@@ -66,13 +66,13 @@ func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
|
||||
}
|
||||
|
||||
// loadGenerate generates jobs until the context is canceled
|
||||
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {
|
||||
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int, size int64) {
|
||||
for i := 0; i < math.MaxInt64; i++ {
|
||||
// We keep generating the same 1000 keys over and over, with different values.
|
||||
// We keep generating the same 100 keys over and over, with different values.
|
||||
// This gives a reasonable load without putting too much data in the app.
|
||||
id := i % 1000
|
||||
id := i % 100
|
||||
|
||||
bz := make([]byte, 1024) // 1kb hex-encoded
|
||||
bz := make([]byte, size)
|
||||
_, err := rand.Read(bz)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to read random bytes: %v", err))
|
||||
@@ -81,7 +81,8 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {
|
||||
|
||||
select {
|
||||
case chTx <- tx:
|
||||
time.Sleep(time.Second / time.Duration(multiplier))
|
||||
sqrtSize := int(math.Sqrt(float64(size)))
|
||||
time.Sleep(10 * time.Millisecond * time.Duration(sqrtSize/multiplier))
|
||||
|
||||
case <-ctx.Done():
|
||||
close(chTx)
|
||||
|
||||
@@ -61,9 +61,6 @@ func NewCLI() *CLI {
|
||||
defer loadCancel()
|
||||
go func() {
|
||||
err := Load(ctx, cli.testnet, 1)
|
||||
if err != nil {
|
||||
logger.Error(fmt.Sprintf("Transaction load failed: %v", err.Error()))
|
||||
}
|
||||
chLoadResult <- err
|
||||
}()
|
||||
|
||||
@@ -95,9 +92,9 @@ func NewCLI() *CLI {
|
||||
|
||||
loadCancel()
|
||||
if err := <-chLoadResult; err != nil {
|
||||
return err
|
||||
return fmt.Errorf("transaction load failed: %w", err)
|
||||
}
|
||||
if err := Wait(cli.testnet, 8); err != nil { // wait for network to settle before tests
|
||||
if err := Wait(cli.testnet, 5); err != nil { // wait for network to settle before tests
|
||||
return err
|
||||
}
|
||||
if err := Test(cli.testnet); err != nil {
|
||||
@@ -238,10 +235,7 @@ func NewCLI() *CLI {
|
||||
Example: "runner logs validator03",
|
||||
Args: cobra.MaximumNArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if len(args) == 1 {
|
||||
return execComposeVerbose(cli.testnet.Dir, "logs", args[0])
|
||||
}
|
||||
return execComposeVerbose(cli.testnet.Dir, "logs")
|
||||
return execComposeVerbose(cli.testnet.Dir, append([]string{"logs", "--no-color"}, args...)...)
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
11
tools/tools.go
Normal file
11
tools/tools.go
Normal file
@@ -0,0 +1,11 @@
|
||||
// +build tools
|
||||
|
||||
// This file uses the recommended method for tracking developer tools in a go module.
|
||||
//
|
||||
// https://github.com/golang/go/wiki/Modules#how-can-i-track-tool-dependencies-for-a-module
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
|
||||
)
|
||||
Reference in New Issue
Block a user