mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-03 11:45:18 +00:00
abci: Adapt unsynchronized local client to replicate remote client concurrency (#9830)
* Revert "abci: Add unsynchronized local client (#9660)"
This reverts commit 45071d1f23.
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* proxy: Add unsync local client creator
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* e2e: Extend tests
Extend the E2E tests to randomly choose between the sync (default) and
unsync (new) local client creator.
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* abci: Remove redundant interface constraint
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* abci: Remove irrelevant doc comment
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* proxy: Remove backticks in doc comments
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* e2e: Remove unnecessary gap between doc comment and struct
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* Add pending changelog entry
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* e2e: Expand on BuiltinProxyMode param docstring
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* Remove builtin proxy mode config option from CI test
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* e2e: Make builtin proxy mode option testnet-wide
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* e2e: Embed sync/unsync notion in node protocol
The approach of randomly generating the proxy mode across testnets
resulted in a totally uneven ratio of sync to unsync modes for all
testnets that happened to have a protocol of "builtin".
This commit adapts the E2E tests to have a new ABCI protocol option:
"builtin_unsync". This results in a better spread of sync/unsync choices
for generated testnets.
Signed-off-by: Thane Thomson <connect@thanethomson.com>
* e2e: Remove unused type
Signed-off-by: Thane Thomson <connect@thanethomson.com>
Signed-off-by: Thane Thomson <connect@thanethomson.com>
This commit is contained in:
@@ -28,6 +28,9 @@
|
||||
|
||||
### FEATURES
|
||||
|
||||
- [proxy] \#9830 Introduce `NewUnsyncLocalClientCreator`, which allows local
|
||||
ABCI clients to have the same concurrency model as remote clients (i.e. one
|
||||
mutex per client "connection", for each of the four ABCI "connections").
|
||||
- [config] \#9680 Introduce `BootstrapPeers` to the config to allow nodes to list peers to be added to
|
||||
the addressbook upon start up (@cmwaters)
|
||||
|
||||
|
||||
@@ -1,263 +0,0 @@
|
||||
package abcicli
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
types "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
)
|
||||
|
||||
type unsyncLocalClient struct {
|
||||
service.BaseService
|
||||
|
||||
types.Application
|
||||
|
||||
// This mutex is exclusively used to protect the callback.
|
||||
mtx sync.RWMutex
|
||||
Callback
|
||||
}
|
||||
|
||||
var _ Client = (*unsyncLocalClient)(nil)
|
||||
|
||||
// NewUnsyncLocalClient creates an unsynchronized local client, which will be
|
||||
// directly calling the methods of the given app.
|
||||
//
|
||||
// Unlike NewLocalClient, it does not hold a mutex around the application, so
|
||||
// it is up to the application to manage its synchronization properly.
|
||||
func NewUnsyncLocalClient(app types.Application) Client {
|
||||
cli := &unsyncLocalClient{
|
||||
Application: app,
|
||||
}
|
||||
cli.BaseService = *service.NewBaseService(nil, "unsyncLocalClient", cli)
|
||||
return cli
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
app.Callback = cb
|
||||
}
|
||||
|
||||
// TODO: change types.Application to include Error()?
|
||||
func (app *unsyncLocalClient) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) FlushAsync() *ReqRes {
|
||||
// Do nothing
|
||||
return newLocalReqRes(types.ToRequestFlush(), nil)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) EchoAsync(msg string) *ReqRes {
|
||||
return app.callback(
|
||||
types.ToRequestEcho(msg),
|
||||
types.ToResponseEcho(msg),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) InfoAsync(req types.RequestInfo) *ReqRes {
|
||||
res := app.Application.Info(req)
|
||||
return app.callback(
|
||||
types.ToRequestInfo(req),
|
||||
types.ToResponseInfo(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
|
||||
res := app.Application.DeliverTx(params)
|
||||
return app.callback(
|
||||
types.ToRequestDeliverTx(params),
|
||||
types.ToResponseDeliverTx(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
|
||||
res := app.Application.CheckTx(req)
|
||||
return app.callback(
|
||||
types.ToRequestCheckTx(req),
|
||||
types.ToResponseCheckTx(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) QueryAsync(req types.RequestQuery) *ReqRes {
|
||||
res := app.Application.Query(req)
|
||||
return app.callback(
|
||||
types.ToRequestQuery(req),
|
||||
types.ToResponseQuery(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) CommitAsync() *ReqRes {
|
||||
res := app.Application.Commit()
|
||||
return app.callback(
|
||||
types.ToRequestCommit(),
|
||||
types.ToResponseCommit(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) InitChainAsync(req types.RequestInitChain) *ReqRes {
|
||||
res := app.Application.InitChain(req)
|
||||
return app.callback(
|
||||
types.ToRequestInitChain(req),
|
||||
types.ToResponseInitChain(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes {
|
||||
res := app.Application.BeginBlock(req)
|
||||
return app.callback(
|
||||
types.ToRequestBeginBlock(req),
|
||||
types.ToResponseBeginBlock(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
|
||||
res := app.Application.EndBlock(req)
|
||||
return app.callback(
|
||||
types.ToRequestEndBlock(req),
|
||||
types.ToResponseEndBlock(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) ListSnapshotsAsync(req types.RequestListSnapshots) *ReqRes {
|
||||
res := app.Application.ListSnapshots(req)
|
||||
return app.callback(
|
||||
types.ToRequestListSnapshots(req),
|
||||
types.ToResponseListSnapshots(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) OfferSnapshotAsync(req types.RequestOfferSnapshot) *ReqRes {
|
||||
res := app.Application.OfferSnapshot(req)
|
||||
return app.callback(
|
||||
types.ToRequestOfferSnapshot(req),
|
||||
types.ToResponseOfferSnapshot(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) LoadSnapshotChunkAsync(req types.RequestLoadSnapshotChunk) *ReqRes {
|
||||
res := app.Application.LoadSnapshotChunk(req)
|
||||
return app.callback(
|
||||
types.ToRequestLoadSnapshotChunk(req),
|
||||
types.ToResponseLoadSnapshotChunk(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotChunk) *ReqRes {
|
||||
res := app.Application.ApplySnapshotChunk(req)
|
||||
return app.callback(
|
||||
types.ToRequestApplySnapshotChunk(req),
|
||||
types.ToResponseApplySnapshotChunk(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) PrepareProposalAsync(req types.RequestPrepareProposal) *ReqRes {
|
||||
res := app.Application.PrepareProposal(req)
|
||||
return app.callback(
|
||||
types.ToRequestPrepareProposal(req),
|
||||
types.ToResponsePrepareProposal(res),
|
||||
)
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) ProcessProposalAsync(req types.RequestProcessProposal) *ReqRes {
|
||||
res := app.Application.ProcessProposal(req)
|
||||
return app.callback(
|
||||
types.ToRequestProcessProposal(req),
|
||||
types.ToResponseProcessProposal(res),
|
||||
)
|
||||
}
|
||||
|
||||
//-------------------------------------------------------
|
||||
|
||||
func (app *unsyncLocalClient) FlushSync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) EchoSync(msg string) (*types.ResponseEcho, error) {
|
||||
return &types.ResponseEcho{Message: msg}, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
res := app.Application.Info(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) DeliverTxSync(req types.RequestDeliverTx) (*types.ResponseDeliverTx, error) {
|
||||
res := app.Application.DeliverTx(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
|
||||
res := app.Application.CheckTx(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
|
||||
res := app.Application.Query(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) CommitSync() (*types.ResponseCommit, error) {
|
||||
res := app.Application.Commit()
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) {
|
||||
res := app.Application.InitChain(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
|
||||
res := app.Application.BeginBlock(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) {
|
||||
res := app.Application.EndBlock(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) ListSnapshotsSync(req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
|
||||
res := app.Application.ListSnapshots(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) OfferSnapshotSync(req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
|
||||
res := app.Application.OfferSnapshot(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) LoadSnapshotChunkSync(
|
||||
req types.RequestLoadSnapshotChunk,
|
||||
) (*types.ResponseLoadSnapshotChunk, error) {
|
||||
res := app.Application.LoadSnapshotChunk(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) ApplySnapshotChunkSync(
|
||||
req types.RequestApplySnapshotChunk,
|
||||
) (*types.ResponseApplySnapshotChunk, error) {
|
||||
res := app.Application.ApplySnapshotChunk(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) PrepareProposalSync(req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
|
||||
res := app.Application.PrepareProposal(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
func (app *unsyncLocalClient) ProcessProposalSync(req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
|
||||
res := app.Application.ProcessProposal(req)
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
//-------------------------------------------------------
|
||||
|
||||
func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
app.Callback(req, res)
|
||||
rr := newLocalReqRes(req, res)
|
||||
rr.callbackInvoked = true
|
||||
return rr
|
||||
}
|
||||
@@ -26,8 +26,12 @@ type localClientCreator struct {
|
||||
app types.Application
|
||||
}
|
||||
|
||||
// NewLocalClientCreator returns a ClientCreator for the given app,
|
||||
// which will be running locally.
|
||||
// NewLocalClientCreator returns a [ClientCreator] for the given app, which
|
||||
// will be running locally.
|
||||
//
|
||||
// Maintains a single mutex over all new clients created with NewABCIClient.
|
||||
// For a local client creator that uses a single mutex per new client, rather
|
||||
// use [NewUnsyncLocalClientCreator].
|
||||
func NewLocalClientCreator(app types.Application) ClientCreator {
|
||||
return &localClientCreator{
|
||||
mtx: new(tmsync.Mutex),
|
||||
@@ -39,24 +43,26 @@ func (l *localClientCreator) NewABCIClient() (abcicli.Client, error) {
|
||||
return abcicli.NewLocalClient(l.mtx, l.app), nil
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------
|
||||
// unsynchronized local proxy on an in-proc app (no mutex)
|
||||
//----------------------------------------------------
|
||||
// local proxy creates a new mutex for each client
|
||||
|
||||
type unsyncLocalClientCreator struct {
|
||||
app types.Application
|
||||
}
|
||||
|
||||
// NewUnsyncLocalClientCreator returns a ClientCreator for the given app, which
|
||||
// will be running locally. Unlike NewLocalClientCreator, this leaves
|
||||
// synchronization up to the application.
|
||||
// NewUnsyncLocalClientCreator returns a [ClientCreator] for the given app.
|
||||
// Unlike [NewLocalClientCreator], each call to NewABCIClient returns an ABCI
|
||||
// client that maintains its own mutex over the application.
|
||||
func NewUnsyncLocalClientCreator(app types.Application) ClientCreator {
|
||||
return &unsyncLocalClientCreator{
|
||||
app: app,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *unsyncLocalClientCreator) NewABCIClient() (abcicli.Client, error) {
|
||||
return abcicli.NewUnsyncLocalClient(l.app), nil
|
||||
func (c *unsyncLocalClientCreator) NewABCIClient() (abcicli.Client, error) {
|
||||
// Specifying nil for the mutex causes each instance to create its own
|
||||
// mutex.
|
||||
return abcicli.NewLocalClient(nil, c.app), nil
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------
|
||||
@@ -88,23 +94,33 @@ func (r *remoteClientCreator) NewABCIClient() (abcicli.Client, error) {
|
||||
return remoteApp, nil
|
||||
}
|
||||
|
||||
// DefaultClientCreator returns a default ClientCreator, which will create a
|
||||
// local client if addr is one of: 'kvstore',
|
||||
// 'persistent_kvstore' or 'noop', otherwise - a remote client.
|
||||
// DefaultClientCreator returns a default [ClientCreator], which will create a
|
||||
// local client if addr is one of "kvstore", "persistent_kvstore", "e2e",
|
||||
// "noop".
|
||||
//
|
||||
// Otherwise a remote client will be created.
|
||||
//
|
||||
// Each of "kvstore", "persistent_kvstore" and "e2e" also currently have an
|
||||
// "_unsync" variant (i.e. "kvstore_unsync", etc.), which attempts to replicate
|
||||
// the same concurrency model as the remote client.
|
||||
func DefaultClientCreator(addr, transport, dbDir string) ClientCreator {
|
||||
switch addr {
|
||||
case "kvstore":
|
||||
return NewLocalClientCreator(kvstore.NewApplication())
|
||||
case "kvstore_unsync":
|
||||
return NewUnsyncLocalClientCreator(kvstore.NewApplication())
|
||||
case "persistent_kvstore":
|
||||
return NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(dbDir))
|
||||
case "persistent_kvstore_unsync":
|
||||
return NewUnsyncLocalClientCreator(kvstore.NewPersistentKVStoreApplication(dbDir))
|
||||
case "e2e":
|
||||
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return NewLocalClientCreator(app)
|
||||
case "e2e_sync":
|
||||
app, err := e2e.NewSyncApplication(e2e.DefaultConfig(dbDir))
|
||||
case "e2e_unsync":
|
||||
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -88,7 +88,7 @@ func DefaultConfig(dir string) *Config {
|
||||
}
|
||||
|
||||
// NewApplication creates the application.
|
||||
func NewApplication(cfg *Config) (abci.Application, error) {
|
||||
func NewApplication(cfg *Config) (*Application, error) {
|
||||
state, err := NewState(cfg.Dir, cfg.PersistInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -1,111 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
// SyncApplication wraps an Application, managing its own synchronization. This
|
||||
// allows it to be called from an unsynchronized local client, as it is
|
||||
// implemented in a thread-safe way.
|
||||
type SyncApplication struct {
|
||||
mtx sync.RWMutex
|
||||
app *Application
|
||||
}
|
||||
|
||||
var _ abci.Application = (*SyncApplication)(nil)
|
||||
|
||||
func NewSyncApplication(cfg *Config) (abci.Application, error) {
|
||||
app, err := NewApplication(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &SyncApplication{
|
||||
app: app.(*Application),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (app *SyncApplication) Info(req abci.RequestInfo) abci.ResponseInfo {
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
return app.app.Info(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
return app.app.InitChain(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
return app.app.CheckTx(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
|
||||
// app.app.PrepareProposal does not modify state
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
return app.app.PrepareProposal(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) ProcessProposal(req abci.RequestProcessProposal) abci.ResponseProcessProposal {
|
||||
// app.app.ProcessProposal does not modify state
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
return app.app.ProcessProposal(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
return app.app.DeliverTx(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
return app.app.BeginBlock(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
return app.app.EndBlock(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) Commit() abci.ResponseCommit {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
return app.app.Commit()
|
||||
}
|
||||
|
||||
func (app *SyncApplication) Query(req abci.RequestQuery) abci.ResponseQuery {
|
||||
app.mtx.RLock()
|
||||
defer app.mtx.RUnlock()
|
||||
return app.app.Query(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
return app.app.ApplySnapshotChunk(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots {
|
||||
// Calls app.snapshots.List(), which is thread-safe.
|
||||
return app.app.ListSnapshots(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk {
|
||||
// Calls app.snapshots.LoadChunk, which is thread-safe.
|
||||
return app.app.LoadSnapshotChunk(req)
|
||||
}
|
||||
|
||||
func (app *SyncApplication) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot {
|
||||
app.mtx.Lock()
|
||||
defer app.mtx.Unlock()
|
||||
return app.app.OfferSnapshot(req)
|
||||
}
|
||||
@@ -29,7 +29,7 @@ var (
|
||||
nodeDatabases = uniformChoice{"goleveldb", "cleveldb", "rocksdb", "boltdb", "badgerdb"}
|
||||
ipv6 = uniformChoice{false, true}
|
||||
// FIXME: grpc disabled due to https://github.com/tendermint/tendermint/issues/5439
|
||||
nodeABCIProtocols = uniformChoice{"unix", "tcp", "builtin"} // "grpc"
|
||||
nodeABCIProtocols = uniformChoice{"unix", "tcp", "builtin", "builtin_unsync"} // "grpc"
|
||||
nodePrivvalProtocols = uniformChoice{"file", "unix", "tcp"}
|
||||
nodeBlockSyncs = uniformChoice{"v0"} // "v2"
|
||||
nodeStateSyncs = uniformChoice{false, true}
|
||||
@@ -110,7 +110,7 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
|
||||
// First we generate seed nodes, starting at the initial height.
|
||||
for i := 1; i <= numSeeds; i++ {
|
||||
manifest.Nodes[fmt.Sprintf("seed%02d", i)] = generateNode(
|
||||
r, e2e.ModeSeed, false, 0, manifest.InitialHeight, false)
|
||||
r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
|
||||
}
|
||||
|
||||
// Next, we generate validators. We make sure a BFT quorum of validators start
|
||||
@@ -125,12 +125,8 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
|
||||
nextStartAt += 5
|
||||
}
|
||||
name := fmt.Sprintf("validator%02d", i)
|
||||
syncApp := false
|
||||
if manifest.ABCIProtocol == string(e2e.ProtocolBuiltin) {
|
||||
syncApp = r.Intn(100) >= 50
|
||||
}
|
||||
manifest.Nodes[name] = generateNode(
|
||||
r, e2e.ModeValidator, syncApp, startAt, manifest.InitialHeight, i <= 2)
|
||||
r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2)
|
||||
|
||||
if startAt == 0 {
|
||||
(*manifest.Validators)[name] = int64(30 + r.Intn(71))
|
||||
@@ -158,12 +154,8 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
|
||||
startAt = nextStartAt
|
||||
nextStartAt += 5
|
||||
}
|
||||
syncApp := false
|
||||
if manifest.ABCIProtocol == string(e2e.ProtocolBuiltin) {
|
||||
syncApp = r.Intn(100) >= 50
|
||||
}
|
||||
manifest.Nodes[fmt.Sprintf("full%02d", i)] = generateNode(
|
||||
r, e2e.ModeFull, syncApp, startAt, manifest.InitialHeight, false)
|
||||
r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
|
||||
}
|
||||
|
||||
// We now set up peer discovery for nodes. Seed nodes are fully meshed with
|
||||
@@ -226,12 +218,11 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
|
||||
// here, since we need to know the overall network topology and startup
|
||||
// sequencing.
|
||||
func generateNode(
|
||||
r *rand.Rand, mode e2e.Mode, syncApp bool, startAt int64, initialHeight int64, forceArchive bool,
|
||||
r *rand.Rand, mode e2e.Mode, startAt int64, initialHeight int64, forceArchive bool,
|
||||
) *e2e.ManifestNode {
|
||||
node := e2e.ManifestNode{
|
||||
Version: nodeVersions.Choose(r).(string),
|
||||
Mode: string(mode),
|
||||
SyncApp: syncApp,
|
||||
StartAt: startAt,
|
||||
Database: nodeDatabases.Choose(r).(string),
|
||||
PrivvalProtocol: nodePrivvalProtocols.Choose(r).(string),
|
||||
|
||||
@@ -60,7 +60,6 @@ perturb = ["kill"]
|
||||
persistent_peers = ["validator01"]
|
||||
database = "rocksdb"
|
||||
abci_protocol = "builtin"
|
||||
sync_app = true
|
||||
perturb = ["pause"]
|
||||
|
||||
[node.validator05]
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/BurntSushi/toml"
|
||||
|
||||
"github.com/tendermint/tendermint/test/e2e/app"
|
||||
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
|
||||
)
|
||||
|
||||
// Config is the application configuration.
|
||||
@@ -17,7 +16,6 @@ type Config struct {
|
||||
Protocol string `toml:"protocol"`
|
||||
Dir string `toml:"dir"`
|
||||
Mode string `toml:"mode"`
|
||||
SyncApp bool `toml:"sync_app"`
|
||||
PersistInterval uint64 `toml:"persist_interval"`
|
||||
SnapshotInterval uint64 `toml:"snapshot_interval"`
|
||||
RetainBlocks uint64 `toml:"retain_blocks"`
|
||||
@@ -62,12 +60,8 @@ func (cfg Config) Validate() error {
|
||||
switch {
|
||||
case cfg.ChainID == "":
|
||||
return errors.New("chain_id parameter is required")
|
||||
case cfg.Listen == "" && cfg.Protocol != "builtin":
|
||||
case cfg.Listen == "" && cfg.Protocol != "builtin" && cfg.Protocol != "builtin_unsync":
|
||||
return errors.New("listen parameter is required")
|
||||
case cfg.SyncApp && cfg.Protocol != string(e2e.ProtocolBuiltin):
|
||||
return errors.New("sync_app parameter is only relevant for builtin applications")
|
||||
case cfg.SyncApp && cfg.Mode != string(e2e.ModeFull) && cfg.Mode != string(e2e.ModeValidator):
|
||||
return errors.New("sync_app parameter is only relevant to full nodes and validators")
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ func run(configFile string) error {
|
||||
if err = startSigner(cfg); err != nil {
|
||||
return err
|
||||
}
|
||||
if cfg.Protocol == "builtin" {
|
||||
if cfg.Protocol == "builtin" || cfg.Protocol == "builtin_unsync" {
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
@@ -71,7 +71,7 @@ func run(configFile string) error {
|
||||
switch cfg.Protocol {
|
||||
case "socket", "grpc":
|
||||
err = startApp(cfg)
|
||||
case "builtin":
|
||||
case "builtin", "builtin_unsync":
|
||||
if cfg.Mode == string(e2e.ModeLight) {
|
||||
err = startLightClient(cfg)
|
||||
} else {
|
||||
@@ -113,22 +113,9 @@ func startApp(cfg *Config) error {
|
||||
//
|
||||
// FIXME There is no way to simply load the configuration from a file, so we need to pull in Viper.
|
||||
func startNode(cfg *Config) error {
|
||||
var cc proxy.ClientCreator
|
||||
|
||||
if cfg.SyncApp {
|
||||
app, err := app.NewSyncApplication(cfg.App())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cc = proxy.NewUnsyncLocalClientCreator(app)
|
||||
logger.Info("Using synchronized app with unsynchronized local client")
|
||||
} else {
|
||||
app, err := app.NewApplication(cfg.App())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cc = proxy.NewLocalClientCreator(app)
|
||||
logger.Info("Using regular app with synchronized (regular) local client")
|
||||
app, err := app.NewApplication(cfg.App())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tmcfg, nodeLogger, nodeKey, err := setupNode()
|
||||
@@ -136,10 +123,19 @@ func startNode(cfg *Config) error {
|
||||
return fmt.Errorf("failed to setup config: %w", err)
|
||||
}
|
||||
|
||||
var clientCreator proxy.ClientCreator
|
||||
if cfg.Protocol == string(e2e.ProtocolBuiltinUnsync) {
|
||||
clientCreator = proxy.NewUnsyncLocalClientCreator(app)
|
||||
nodeLogger.Info("Using unsynchronized local client creator")
|
||||
} else {
|
||||
clientCreator = proxy.NewLocalClientCreator(app)
|
||||
nodeLogger.Info("Using default (synchronized) local client creator")
|
||||
}
|
||||
|
||||
n, err := node.NewNode(tmcfg,
|
||||
privval.LoadOrGenFilePV(tmcfg.PrivValidatorKeyFile(), tmcfg.PrivValidatorStateFile()),
|
||||
nodeKey,
|
||||
cc,
|
||||
clientCreator,
|
||||
node.DefaultGenesisDocProviderFunc(tmcfg),
|
||||
config.DefaultDBProvider,
|
||||
node.DefaultMetricsProvider(tmcfg.Instrumentation),
|
||||
|
||||
@@ -26,7 +26,7 @@ func (p *Provider) Setup() error {
|
||||
}
|
||||
//nolint: gosec
|
||||
// G306: Expect WriteFile permissions to be 0600 or less
|
||||
err = os.WriteFile(filepath.Join(p.Testnet.Dir, "docker-compose.yml"), compose, 0644)
|
||||
err = os.WriteFile(filepath.Join(p.Testnet.Dir, "docker-compose.yml"), compose, 0o644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -60,6 +60,9 @@ services:
|
||||
image: tendermint/e2e-node:{{ .Version }}
|
||||
{{- if eq .ABCIProtocol "builtin" }}
|
||||
entrypoint: /usr/bin/entrypoint-builtin
|
||||
{{- else }}{{ if eq .ABCIProtocol "builtin_unsync" }}
|
||||
entrypoint: /usr/bin/entrypoint-builtin
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
init: true
|
||||
ports:
|
||||
|
||||
@@ -57,9 +57,15 @@ type Manifest struct {
|
||||
Evidence int `toml:"evidence"`
|
||||
|
||||
// ABCIProtocol specifies the protocol used to communicate with the ABCI
|
||||
// application: "unix", "tcp", "grpc", or "builtin". Defaults to builtin.
|
||||
// builtin will build a complete Tendermint node into the application and
|
||||
// launch it instead of launching a separate Tendermint process.
|
||||
// application: "unix", "tcp", "grpc", "builtin" or "builtin_unsync".
|
||||
//
|
||||
// Defaults to "builtin". "builtin" will build a complete Tendermint node
|
||||
// into the application and launch it instead of launching a separate
|
||||
// Tendermint process.
|
||||
//
|
||||
// "builtin_unsync" is basically the same as "builtin", except that it uses
|
||||
// an "unsynchronized" local client creator, which attempts to replicate the
|
||||
// same concurrency model locally as the socket client.
|
||||
ABCIProtocol string `toml:"abci_protocol"`
|
||||
|
||||
// Add artificial delays to each of the main ABCI calls to mimic computation time
|
||||
@@ -88,15 +94,6 @@ type ManifestNode struct {
|
||||
// on the machine where the test is being run.
|
||||
Version string `toml:"version"`
|
||||
|
||||
// SyncApp specifies whether this node should use a synchronized application
|
||||
// with an unsynchronized local client. By default this is `false`, meaning
|
||||
// that the node will run an unsynchronized application with a synchronized
|
||||
// local client.
|
||||
//
|
||||
// Only applies to validators and full nodes where their ABCI protocol is
|
||||
// "builtin".
|
||||
SyncApp bool `toml:"sync_app"`
|
||||
|
||||
// Seeds is the list of node names to use as P2P seed nodes. Defaults to none.
|
||||
Seeds []string `toml:"seeds"`
|
||||
|
||||
|
||||
@@ -39,11 +39,12 @@ const (
|
||||
ModeLight Mode = "light"
|
||||
ModeSeed Mode = "seed"
|
||||
|
||||
ProtocolBuiltin Protocol = "builtin"
|
||||
ProtocolFile Protocol = "file"
|
||||
ProtocolGRPC Protocol = "grpc"
|
||||
ProtocolTCP Protocol = "tcp"
|
||||
ProtocolUNIX Protocol = "unix"
|
||||
ProtocolBuiltin Protocol = "builtin"
|
||||
ProtocolBuiltinUnsync Protocol = "builtin_unsync"
|
||||
ProtocolFile Protocol = "file"
|
||||
ProtocolGRPC Protocol = "grpc"
|
||||
ProtocolTCP Protocol = "tcp"
|
||||
ProtocolUNIX Protocol = "unix"
|
||||
|
||||
PerturbationDisconnect Perturbation = "disconnect"
|
||||
PerturbationKill Perturbation = "kill"
|
||||
@@ -82,7 +83,6 @@ type Node struct {
|
||||
Version string
|
||||
Testnet *Testnet
|
||||
Mode Mode
|
||||
SyncApp bool // Should we use a synchronized app with an unsynchronized local client?
|
||||
PrivvalKey crypto.PrivKey
|
||||
NodeKey crypto.PrivKey
|
||||
IP net.IP
|
||||
@@ -183,7 +183,6 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
|
||||
IP: ind.IPAddress,
|
||||
ProxyPort: proxyPortGen.Next(),
|
||||
Mode: ModeValidator,
|
||||
SyncApp: nodeManifest.SyncApp,
|
||||
Database: "goleveldb",
|
||||
ABCIProtocol: Protocol(testnet.ABCIProtocol),
|
||||
PrivvalProtocol: ProtocolFile,
|
||||
@@ -347,11 +346,11 @@ func (n Node) Validate(testnet Testnet) error {
|
||||
return fmt.Errorf("invalid database setting %q", n.Database)
|
||||
}
|
||||
switch n.ABCIProtocol {
|
||||
case ProtocolBuiltin, ProtocolUNIX, ProtocolTCP, ProtocolGRPC:
|
||||
case ProtocolBuiltin, ProtocolBuiltinUnsync, ProtocolUNIX, ProtocolTCP, ProtocolGRPC:
|
||||
default:
|
||||
return fmt.Errorf("invalid ABCI protocol setting %q", n.ABCIProtocol)
|
||||
}
|
||||
if n.Mode == ModeLight && n.ABCIProtocol != ProtocolBuiltin {
|
||||
if n.Mode == ModeLight && n.ABCIProtocol != ProtocolBuiltin && n.ABCIProtocol != ProtocolBuiltinUnsync {
|
||||
return errors.New("light client must use builtin protocol")
|
||||
}
|
||||
switch n.PrivvalProtocol {
|
||||
|
||||
@@ -174,7 +174,7 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) {
|
||||
case e2e.ProtocolGRPC:
|
||||
cfg.ProxyApp = AppAddressTCP
|
||||
cfg.ABCI = "grpc"
|
||||
case e2e.ProtocolBuiltin:
|
||||
case e2e.ProtocolBuiltin, e2e.ProtocolBuiltinUnsync:
|
||||
cfg.ProxyApp = ""
|
||||
cfg.ABCI = ""
|
||||
default:
|
||||
@@ -258,7 +258,6 @@ func MakeAppConfig(node *e2e.Node) ([]byte, error) {
|
||||
"dir": "data/app",
|
||||
"listen": AppAddressUNIX,
|
||||
"mode": node.Mode,
|
||||
"sync_app": node.SyncApp,
|
||||
"proxy_port": node.ProxyPort,
|
||||
"protocol": "socket",
|
||||
"persist_interval": node.PersistInterval,
|
||||
@@ -277,9 +276,9 @@ func MakeAppConfig(node *e2e.Node) ([]byte, error) {
|
||||
case e2e.ProtocolGRPC:
|
||||
cfg["listen"] = AppAddressTCP
|
||||
cfg["protocol"] = "grpc"
|
||||
case e2e.ProtocolBuiltin:
|
||||
case e2e.ProtocolBuiltin, e2e.ProtocolBuiltinUnsync:
|
||||
delete(cfg, "listen")
|
||||
cfg["protocol"] = "builtin"
|
||||
cfg["protocol"] = string(node.ABCIProtocol)
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected ABCI protocol setting %q", node.ABCIProtocol)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user