abci: Add unsynchronized local client (#9660)

* Remove extra interface cast

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* Remove irrelevant comment

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* abci: Add unsynchronized local client

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* proxy: Add unsync local client creator

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* e2e: Add sync app for use with unsync local client

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* abci: Elaborate on mutex param in unsync local client

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* proxy: Remove unnecessary comment

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* abcicli: Remove unnecessary mutex param from unsync client

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* ci/e2e: Explicitly use sync app for validator04

Signed-off-by: Thane Thomson <connect@thanethomson.com>

* e2e: Ensure app is definitely the E2E app

Signed-off-by: Thane Thomson <connect@thanethomson.com>

Signed-off-by: Thane Thomson <connect@thanethomson.com>
This commit is contained in:
Thane Thomson
2022-11-07 06:46:55 -05:00
committed by GitHub
parent 5a9a84eb02
commit 45071d1f23
12 changed files with 460 additions and 21 deletions

View File

@@ -6,8 +6,6 @@ import (
tmsync "github.com/tendermint/tendermint/libs/sync"
)
var _ Client = (*localClient)(nil)
// NOTE: use defer to unlock mutex because Application might panic (e.g., in
// case of malicious tx or query). It only makes sense for publicly exposed
// methods like CheckTx (/broadcast_tx_* RPC endpoint) or Query (/abci_query
@@ -24,8 +22,6 @@ var _ Client = (*localClient)(nil)
// NewLocalClient creates a local client, which will be directly calling the
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = new(tmsync.Mutex)
@@ -309,7 +305,8 @@ func (app *localClient) OfferSnapshotSync(req types.RequestOfferSnapshot) (*type
}
func (app *localClient) LoadSnapshotChunkSync(
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
req types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
@@ -318,7 +315,8 @@ func (app *localClient) LoadSnapshotChunkSync(
}
func (app *localClient) ApplySnapshotChunkSync(
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
req types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

View File

@@ -0,0 +1,263 @@
package abcicli
import (
"sync"
types "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/service"
)
type unsyncLocalClient struct {
service.BaseService
types.Application
// This mutex is exclusively used to protect the callback.
mtx sync.RWMutex
Callback
}
var _ Client = (*unsyncLocalClient)(nil)
// NewUnsyncLocalClient creates an unsynchronized local client, which will be
// directly calling the methods of the given app.
//
// Unlike NewLocalClient, it does not hold a mutex around the application, so
// it is up to the application to manage its synchronization properly.
func NewUnsyncLocalClient(app types.Application) Client {
cli := &unsyncLocalClient{
Application: app,
}
cli.BaseService = *service.NewBaseService(nil, "unsyncLocalClient", cli)
return cli
}
func (app *unsyncLocalClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
}
// TODO: change types.Application to include Error()?
func (app *unsyncLocalClient) Error() error {
return nil
}
func (app *unsyncLocalClient) FlushAsync() *ReqRes {
// Do nothing
return newLocalReqRes(types.ToRequestFlush(), nil)
}
func (app *unsyncLocalClient) EchoAsync(msg string) *ReqRes {
return app.callback(
types.ToRequestEcho(msg),
types.ToResponseEcho(msg),
)
}
func (app *unsyncLocalClient) InfoAsync(req types.RequestInfo) *ReqRes {
res := app.Application.Info(req)
return app.callback(
types.ToRequestInfo(req),
types.ToResponseInfo(res),
)
}
func (app *unsyncLocalClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
res := app.Application.DeliverTx(params)
return app.callback(
types.ToRequestDeliverTx(params),
types.ToResponseDeliverTx(res),
)
}
func (app *unsyncLocalClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
)
}
func (app *unsyncLocalClient) QueryAsync(req types.RequestQuery) *ReqRes {
res := app.Application.Query(req)
return app.callback(
types.ToRequestQuery(req),
types.ToResponseQuery(res),
)
}
func (app *unsyncLocalClient) CommitAsync() *ReqRes {
res := app.Application.Commit()
return app.callback(
types.ToRequestCommit(),
types.ToResponseCommit(res),
)
}
func (app *unsyncLocalClient) InitChainAsync(req types.RequestInitChain) *ReqRes {
res := app.Application.InitChain(req)
return app.callback(
types.ToRequestInitChain(req),
types.ToResponseInitChain(res),
)
}
func (app *unsyncLocalClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes {
res := app.Application.BeginBlock(req)
return app.callback(
types.ToRequestBeginBlock(req),
types.ToResponseBeginBlock(res),
)
}
func (app *unsyncLocalClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
res := app.Application.EndBlock(req)
return app.callback(
types.ToRequestEndBlock(req),
types.ToResponseEndBlock(res),
)
}
func (app *unsyncLocalClient) ListSnapshotsAsync(req types.RequestListSnapshots) *ReqRes {
res := app.Application.ListSnapshots(req)
return app.callback(
types.ToRequestListSnapshots(req),
types.ToResponseListSnapshots(res),
)
}
func (app *unsyncLocalClient) OfferSnapshotAsync(req types.RequestOfferSnapshot) *ReqRes {
res := app.Application.OfferSnapshot(req)
return app.callback(
types.ToRequestOfferSnapshot(req),
types.ToResponseOfferSnapshot(res),
)
}
func (app *unsyncLocalClient) LoadSnapshotChunkAsync(req types.RequestLoadSnapshotChunk) *ReqRes {
res := app.Application.LoadSnapshotChunk(req)
return app.callback(
types.ToRequestLoadSnapshotChunk(req),
types.ToResponseLoadSnapshotChunk(res),
)
}
func (app *unsyncLocalClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotChunk) *ReqRes {
res := app.Application.ApplySnapshotChunk(req)
return app.callback(
types.ToRequestApplySnapshotChunk(req),
types.ToResponseApplySnapshotChunk(res),
)
}
func (app *unsyncLocalClient) PrepareProposalAsync(req types.RequestPrepareProposal) *ReqRes {
res := app.Application.PrepareProposal(req)
return app.callback(
types.ToRequestPrepareProposal(req),
types.ToResponsePrepareProposal(res),
)
}
func (app *unsyncLocalClient) ProcessProposalAsync(req types.RequestProcessProposal) *ReqRes {
res := app.Application.ProcessProposal(req)
return app.callback(
types.ToRequestProcessProposal(req),
types.ToResponseProcessProposal(res),
)
}
//-------------------------------------------------------
func (app *unsyncLocalClient) FlushSync() error {
return nil
}
func (app *unsyncLocalClient) EchoSync(msg string) (*types.ResponseEcho, error) {
return &types.ResponseEcho{Message: msg}, nil
}
func (app *unsyncLocalClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
res := app.Application.Info(req)
return &res, nil
}
func (app *unsyncLocalClient) DeliverTxSync(req types.RequestDeliverTx) (*types.ResponseDeliverTx, error) {
res := app.Application.DeliverTx(req)
return &res, nil
}
func (app *unsyncLocalClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
res := app.Application.CheckTx(req)
return &res, nil
}
func (app *unsyncLocalClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
res := app.Application.Query(req)
return &res, nil
}
func (app *unsyncLocalClient) CommitSync() (*types.ResponseCommit, error) {
res := app.Application.Commit()
return &res, nil
}
func (app *unsyncLocalClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) {
res := app.Application.InitChain(req)
return &res, nil
}
func (app *unsyncLocalClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
res := app.Application.BeginBlock(req)
return &res, nil
}
func (app *unsyncLocalClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) {
res := app.Application.EndBlock(req)
return &res, nil
}
func (app *unsyncLocalClient) ListSnapshotsSync(req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
res := app.Application.ListSnapshots(req)
return &res, nil
}
func (app *unsyncLocalClient) OfferSnapshotSync(req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
res := app.Application.OfferSnapshot(req)
return &res, nil
}
func (app *unsyncLocalClient) LoadSnapshotChunkSync(
req types.RequestLoadSnapshotChunk,
) (*types.ResponseLoadSnapshotChunk, error) {
res := app.Application.LoadSnapshotChunk(req)
return &res, nil
}
func (app *unsyncLocalClient) ApplySnapshotChunkSync(
req types.RequestApplySnapshotChunk,
) (*types.ResponseApplySnapshotChunk, error) {
res := app.Application.ApplySnapshotChunk(req)
return &res, nil
}
func (app *unsyncLocalClient) PrepareProposalSync(req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
res := app.Application.PrepareProposal(req)
return &res, nil
}
func (app *unsyncLocalClient) ProcessProposalSync(req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
res := app.Application.ProcessProposal(req)
return &res, nil
}
//-------------------------------------------------------
func (app *unsyncLocalClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.Callback(req, res)
rr := newLocalReqRes(req, res)
rr.callbackInvoked = true
return rr
}

View File

@@ -39,6 +39,26 @@ func (l *localClientCreator) NewABCIClient() (abcicli.Client, error) {
return abcicli.NewLocalClient(l.mtx, l.app), nil
}
//---------------------------------------------------------------
// unsynchronized local proxy on an in-proc app (no mutex)
type unsyncLocalClientCreator struct {
app types.Application
}
// NewUnsyncLocalClientCreator returns a ClientCreator for the given app, which
// will be running locally. Unlike NewLocalClientCreator, this leaves
// synchronization up to the application.
func NewUnsyncLocalClientCreator(app types.Application) ClientCreator {
return &unsyncLocalClientCreator{
app: app,
}
}
func (l *unsyncLocalClientCreator) NewABCIClient() (abcicli.Client, error) {
return abcicli.NewUnsyncLocalClient(l.app), nil
}
//---------------------------------------------------------------
// remote proxy opens new connections to an external app process
@@ -83,6 +103,12 @@ func DefaultClientCreator(addr, transport, dbDir string) ClientCreator {
panic(err)
}
return NewLocalClientCreator(app)
case "e2e_sync":
app, err := e2e.NewSyncApplication(e2e.DefaultConfig(dbDir))
if err != nil {
panic(err)
}
return NewUnsyncLocalClientCreator(app)
case "noop":
return NewLocalClientCreator(types.NewBaseApplication())
default:

View File

@@ -21,7 +21,6 @@ const appVersion = 1
// Application is an ABCI application for use by end-to-end tests. It is a
// simple key/value store for strings, storing data in memory and persisting
// to disk as JSON, taking state sync snapshots if requested.
type Application struct {
abci.BaseApplication
logger log.Logger
@@ -89,7 +88,7 @@ func DefaultConfig(dir string) *Config {
}
// NewApplication creates the application.
func NewApplication(cfg *Config) (*Application, error) {
func NewApplication(cfg *Config) (abci.Application, error) {
state, err := NewState(cfg.Dir, cfg.PersistInterval)
if err != nil {
return nil, err
@@ -271,7 +270,8 @@ func (app *Application) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) a
}
func (app *Application) PrepareProposal(
req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
req abci.RequestPrepareProposal,
) abci.ResponsePrepareProposal {
txs := make([][]byte, 0, len(req.Txs))
var totalBytes int64
for _, tx := range req.Txs {

111
test/e2e/app/sync_app.go Normal file
View File

@@ -0,0 +1,111 @@
package app
import (
"sync"
abci "github.com/tendermint/tendermint/abci/types"
)
// SyncApplication wraps an Application, managing its own synchronization. This
// allows it to be called from an unsynchronized local client, as it is
// implemented in a thread-safe way.
type SyncApplication struct {
mtx sync.RWMutex
app *Application
}
var _ abci.Application = (*SyncApplication)(nil)
func NewSyncApplication(cfg *Config) (abci.Application, error) {
app, err := NewApplication(cfg)
if err != nil {
return nil, err
}
return &SyncApplication{
app: app.(*Application),
}, nil
}
func (app *SyncApplication) Info(req abci.RequestInfo) abci.ResponseInfo {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.Info(req)
}
func (app *SyncApplication) InitChain(req abci.RequestInitChain) abci.ResponseInitChain {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.InitChain(req)
}
func (app *SyncApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.CheckTx(req)
}
func (app *SyncApplication) PrepareProposal(req abci.RequestPrepareProposal) abci.ResponsePrepareProposal {
// app.app.PrepareProposal does not modify state
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.PrepareProposal(req)
}
func (app *SyncApplication) ProcessProposal(req abci.RequestProcessProposal) abci.ResponseProcessProposal {
// app.app.ProcessProposal does not modify state
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.ProcessProposal(req)
}
func (app *SyncApplication) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.DeliverTx(req)
}
func (app *SyncApplication) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.BeginBlock(req)
}
func (app *SyncApplication) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.EndBlock(req)
}
func (app *SyncApplication) Commit() abci.ResponseCommit {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.Commit()
}
func (app *SyncApplication) Query(req abci.RequestQuery) abci.ResponseQuery {
app.mtx.RLock()
defer app.mtx.RUnlock()
return app.app.Query(req)
}
func (app *SyncApplication) ApplySnapshotChunk(req abci.RequestApplySnapshotChunk) abci.ResponseApplySnapshotChunk {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.ApplySnapshotChunk(req)
}
func (app *SyncApplication) ListSnapshots(req abci.RequestListSnapshots) abci.ResponseListSnapshots {
// Calls app.snapshots.List(), which is thread-safe.
return app.app.ListSnapshots(req)
}
func (app *SyncApplication) LoadSnapshotChunk(req abci.RequestLoadSnapshotChunk) abci.ResponseLoadSnapshotChunk {
// Calls app.snapshots.LoadChunk, which is thread-safe.
return app.app.LoadSnapshotChunk(req)
}
func (app *SyncApplication) OfferSnapshot(req abci.RequestOfferSnapshot) abci.ResponseOfferSnapshot {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.app.OfferSnapshot(req)
}

View File

@@ -105,7 +105,7 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
// First we generate seed nodes, starting at the initial height.
for i := 1; i <= numSeeds; i++ {
manifest.Nodes[fmt.Sprintf("seed%02d", i)] = generateNode(
r, e2e.ModeSeed, 0, manifest.InitialHeight, false)
r, e2e.ModeSeed, false, 0, manifest.InitialHeight, false)
}
// Next, we generate validators. We make sure a BFT quorum of validators start
@@ -120,8 +120,12 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
nextStartAt += 5
}
name := fmt.Sprintf("validator%02d", i)
syncApp := false
if manifest.ABCIProtocol == string(e2e.ProtocolBuiltin) {
syncApp = r.Intn(100) >= 50
}
manifest.Nodes[name] = generateNode(
r, e2e.ModeValidator, startAt, manifest.InitialHeight, i <= 2)
r, e2e.ModeValidator, syncApp, startAt, manifest.InitialHeight, i <= 2)
if startAt == 0 {
(*manifest.Validators)[name] = int64(30 + r.Intn(71))
@@ -149,8 +153,12 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
startAt = nextStartAt
nextStartAt += 5
}
syncApp := false
if manifest.ABCIProtocol == string(e2e.ProtocolBuiltin) {
syncApp = r.Intn(100) >= 50
}
manifest.Nodes[fmt.Sprintf("full%02d", i)] = generateNode(
r, e2e.ModeFull, startAt, manifest.InitialHeight, false)
r, e2e.ModeFull, syncApp, startAt, manifest.InitialHeight, false)
}
// We now set up peer discovery for nodes. Seed nodes are fully meshed with
@@ -213,10 +221,11 @@ func generateTestnet(r *rand.Rand, opt map[string]interface{}) (e2e.Manifest, er
// here, since we need to know the overall network topology and startup
// sequencing.
func generateNode(
r *rand.Rand, mode e2e.Mode, startAt int64, initialHeight int64, forceArchive bool,
r *rand.Rand, mode e2e.Mode, syncApp bool, startAt int64, initialHeight int64, forceArchive bool,
) *e2e.ManifestNode {
node := e2e.ManifestNode{
Mode: string(mode),
SyncApp: syncApp,
StartAt: startAt,
Database: nodeDatabases.Choose(r).(string),
PrivvalProtocol: nodePrivvalProtocols.Choose(r).(string),

View File

@@ -60,6 +60,7 @@ perturb = ["kill"]
persistent_peers = ["validator01"]
database = "rocksdb"
abci_protocol = "builtin"
sync_app = true
perturb = ["pause"]
[node.validator05]

View File

@@ -7,15 +7,17 @@ import (
"github.com/BurntSushi/toml"
"github.com/tendermint/tendermint/test/e2e/app"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
)
// Config is the application configuration.
type Config struct {
ChainID string `toml:"chain_id"`
Listen string
Protocol string
Dir string
ChainID string `toml:"chain_id"`
Listen string `toml:"listen"`
Protocol string `toml:"protocol"`
Dir string `toml:"dir"`
Mode string `toml:"mode"`
SyncApp bool `toml:"sync_app"`
PersistInterval uint64 `toml:"persist_interval"`
SnapshotInterval uint64 `toml:"snapshot_interval"`
RetainBlocks uint64 `toml:"retain_blocks"`
@@ -62,6 +64,10 @@ func (cfg Config) Validate() error {
return errors.New("chain_id parameter is required")
case cfg.Listen == "" && cfg.Protocol != "builtin":
return errors.New("listen parameter is required")
case cfg.SyncApp && cfg.Protocol != string(e2e.ProtocolBuiltin):
return errors.New("sync_app parameter is only relevant for builtin applications")
case cfg.SyncApp && cfg.Mode != string(e2e.ModeFull) && cfg.Mode != string(e2e.ModeValidator):
return errors.New("sync_app parameter is only relevant to full nodes and validators")
default:
return nil
}

View File

@@ -113,9 +113,22 @@ func startApp(cfg *Config) error {
//
// FIXME There is no way to simply load the configuration from a file, so we need to pull in Viper.
func startNode(cfg *Config) error {
app, err := app.NewApplication(cfg.App())
if err != nil {
return err
var cc proxy.ClientCreator
if cfg.SyncApp {
app, err := app.NewSyncApplication(cfg.App())
if err != nil {
return err
}
cc = proxy.NewUnsyncLocalClientCreator(app)
logger.Info("Using synchronized app with unsynchronized local client")
} else {
app, err := app.NewApplication(cfg.App())
if err != nil {
return err
}
cc = proxy.NewLocalClientCreator(app)
logger.Info("Using regular app with synchronized (regular) local client")
}
tmcfg, nodeLogger, nodeKey, err := setupNode()
@@ -126,7 +139,7 @@ func startNode(cfg *Config) error {
n, err := node.NewNode(tmcfg,
privval.LoadOrGenFilePV(tmcfg.PrivValidatorKeyFile(), tmcfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
cc,
node.DefaultGenesisDocProviderFunc(tmcfg),
node.DefaultDBProvider,
node.DefaultMetricsProvider(tmcfg.Instrumentation),

View File

@@ -77,6 +77,15 @@ type ManifestNode struct {
// is generated), and seed nodes run in seed mode with the PEX reactor enabled.
Mode string `toml:"mode"`
// SyncApp specifies whether this node should use a synchronized application
// with an unsynchronized local client. By default this is `false`, meaning
// that the node will run an unsynchronized application with a synchronized
// local client.
//
// Only applies to validators and full nodes where their ABCI protocol is
// "builtin".
SyncApp bool `toml:"sync_app"`
// Seeds is the list of node names to use as P2P seed nodes. Defaults to none.
Seeds []string `toml:"seeds"`

View File

@@ -74,6 +74,7 @@ type Node struct {
Name string
Testnet *Testnet
Mode Mode
SyncApp bool // Should we use a synchronized app with an unsynchronized local client?
PrivvalKey crypto.PrivKey
NodeKey crypto.PrivKey
IP net.IP
@@ -154,6 +155,7 @@ func LoadTestnet(manifest Manifest, fname string, ifd InfrastructureData) (*Test
IP: ind.IPAddress,
ProxyPort: proxyPortGen.Next(),
Mode: ModeValidator,
SyncApp: nodeManifest.SyncApp,
Database: "goleveldb",
ABCIProtocol: Protocol(testnet.ABCIProtocol),
PrivvalProtocol: ProtocolFile,

View File

@@ -258,6 +258,7 @@ func MakeAppConfig(node *e2e.Node) ([]byte, error) {
"dir": "data/app",
"listen": AppAddressUNIX,
"mode": node.Mode,
"sync_app": node.SyncApp,
"proxy_port": node.ProxyPort,
"protocol": "socket",
"persist_interval": node.PersistInterval,