mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-05 13:05:09 +00:00
Adds the ABCI interface for [state sync](https://github.com/tendermint/tendermint/issues/828) as outlined in [ADR-053](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-053-state-sync-prototype.md), and bumps ABCIVersion to `0.17.0`. The interface adds a new ABCI connection which Tendermint can use to query and load snapshots from the app (for serving snapshots to other nodes), and to offer and apply snapshots to the app (for state syncing a local node from peers). Split out from the original PR in #4645, state sync reactor will be submitted as a separate PR. The interface is implemented by the Cosmos SDK in https://github.com/cosmos/cosmos-sdk/pull/5803.
138 lines
4.1 KiB
Go
138 lines
4.1 KiB
Go
package abcicli
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/libs/service"
|
|
)
|
|
|
|
const (
|
|
dialRetryIntervalSeconds = 3
|
|
echoRetryIntervalSeconds = 1
|
|
)
|
|
|
|
// Client defines an interface for an ABCI client.
|
|
// All `Async` methods return a `ReqRes` object.
|
|
// All `Sync` methods return the appropriate protobuf ResponseXxx struct and an error.
|
|
// Note these are client errors, eg. ABCI socket connectivity issues.
|
|
// Application-related errors are reflected in response via ABCI error codes and logs.
|
|
type Client interface {
|
|
service.Service
|
|
|
|
SetResponseCallback(Callback)
|
|
Error() error
|
|
|
|
FlushAsync() *ReqRes
|
|
EchoAsync(msg string) *ReqRes
|
|
InfoAsync(types.RequestInfo) *ReqRes
|
|
SetOptionAsync(types.RequestSetOption) *ReqRes
|
|
DeliverTxAsync(types.RequestDeliverTx) *ReqRes
|
|
CheckTxAsync(types.RequestCheckTx) *ReqRes
|
|
QueryAsync(types.RequestQuery) *ReqRes
|
|
CommitAsync() *ReqRes
|
|
InitChainAsync(types.RequestInitChain) *ReqRes
|
|
BeginBlockAsync(types.RequestBeginBlock) *ReqRes
|
|
EndBlockAsync(types.RequestEndBlock) *ReqRes
|
|
ListSnapshotsAsync(types.RequestListSnapshots) *ReqRes
|
|
OfferSnapshotAsync(types.RequestOfferSnapshot) *ReqRes
|
|
LoadSnapshotChunkAsync(types.RequestLoadSnapshotChunk) *ReqRes
|
|
ApplySnapshotChunkAsync(types.RequestApplySnapshotChunk) *ReqRes
|
|
|
|
FlushSync() error
|
|
EchoSync(msg string) (*types.ResponseEcho, error)
|
|
InfoSync(types.RequestInfo) (*types.ResponseInfo, error)
|
|
SetOptionSync(types.RequestSetOption) (*types.ResponseSetOption, error)
|
|
DeliverTxSync(types.RequestDeliverTx) (*types.ResponseDeliverTx, error)
|
|
CheckTxSync(types.RequestCheckTx) (*types.ResponseCheckTx, error)
|
|
QuerySync(types.RequestQuery) (*types.ResponseQuery, error)
|
|
CommitSync() (*types.ResponseCommit, error)
|
|
InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error)
|
|
BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
|
|
EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error)
|
|
ListSnapshotsSync(types.RequestListSnapshots) (*types.ResponseListSnapshots, error)
|
|
OfferSnapshotSync(types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
|
|
LoadSnapshotChunkSync(types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
|
|
ApplySnapshotChunkSync(types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
|
|
}
|
|
|
|
//----------------------------------------
|
|
|
|
// NewClient returns a new ABCI client of the specified transport type.
|
|
// It returns an error if the transport is not "socket" or "grpc"
|
|
func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
|
|
switch transport {
|
|
case "socket":
|
|
client = NewSocketClient(addr, mustConnect)
|
|
case "grpc":
|
|
client = NewGRPCClient(addr, mustConnect)
|
|
default:
|
|
err = fmt.Errorf("unknown abci transport %s", transport)
|
|
}
|
|
return
|
|
}
|
|
|
|
//----------------------------------------
|
|
|
|
type Callback func(*types.Request, *types.Response)
|
|
|
|
//----------------------------------------
|
|
|
|
type ReqRes struct {
|
|
*types.Request
|
|
*sync.WaitGroup
|
|
*types.Response // Not set atomically, so be sure to use WaitGroup.
|
|
|
|
mtx sync.Mutex
|
|
done bool // Gets set to true once *after* WaitGroup.Done().
|
|
cb func(*types.Response) // A single callback that may be set.
|
|
}
|
|
|
|
func NewReqRes(req *types.Request) *ReqRes {
|
|
return &ReqRes{
|
|
Request: req,
|
|
WaitGroup: waitGroup1(),
|
|
Response: nil,
|
|
|
|
done: false,
|
|
cb: nil,
|
|
}
|
|
}
|
|
|
|
// Sets the callback for this ReqRes atomically.
|
|
// If reqRes is already done, calls cb immediately.
|
|
// NOTE: reqRes.cb should not change if reqRes.done.
|
|
// NOTE: only one callback is supported.
|
|
func (reqRes *ReqRes) SetCallback(cb func(res *types.Response)) {
|
|
reqRes.mtx.Lock()
|
|
|
|
if reqRes.done {
|
|
reqRes.mtx.Unlock()
|
|
cb(reqRes.Response)
|
|
return
|
|
}
|
|
|
|
reqRes.cb = cb
|
|
reqRes.mtx.Unlock()
|
|
}
|
|
|
|
func (reqRes *ReqRes) GetCallback() func(*types.Response) {
|
|
reqRes.mtx.Lock()
|
|
defer reqRes.mtx.Unlock()
|
|
return reqRes.cb
|
|
}
|
|
|
|
// NOTE: it should be safe to read reqRes.cb without locks after this.
|
|
func (reqRes *ReqRes) SetDone() {
|
|
reqRes.mtx.Lock()
|
|
reqRes.done = true
|
|
reqRes.mtx.Unlock()
|
|
}
|
|
|
|
func waitGroup1() (wg *sync.WaitGroup) {
|
|
wg = &sync.WaitGroup{}
|
|
wg.Add(1)
|
|
return
|
|
}
|