Files
tendermint/abci/example/example_test.go
Anton Kaliaev e13b4386ff abci: modify Client interface and socket client (#5673)
`abci.Client`:

- Sync and Async methods now accept a context for cancellation
    * grpc client uses context to cancel both Sync and Async requests
    * local client ignores context parameter
    * socket client uses context to cancel Sync requests and to drop Async requests before sending them if context was cancelled prior to that

- Async methods return an error
    * socket client returns an error immediately if queue is full for Async requests
    * local client always returns nil error
    * grpc client returns an error if context was cancelled before we got response or the receiving queue had a space for response (do not confuse with the sending queue from the socket client)

- specify clients semantics in [doc.go](https://raw.githubusercontent.com/tendermint/tendermint/27112fffa62276bc016d56741f686f0f77931748/abci/client/doc.go)

`mempool.TxInfo`

- add optional `Context` to `TxInfo`, which can be used to cancel `CheckTx` request

Closes #5190
2020-11-30 16:46:16 +04:00

186 lines
4.5 KiB
Go

package example
import (
"context"
"fmt"
"math/rand"
"net"
"os"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/tendermint/tendermint/libs/log"
tmnet "github.com/tendermint/tendermint/libs/net"
abcicli "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/code"
"github.com/tendermint/tendermint/abci/example/kvstore"
abciserver "github.com/tendermint/tendermint/abci/server"
"github.com/tendermint/tendermint/abci/types"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func TestKVStore(t *testing.T) {
fmt.Println("### Testing KVStore")
testStream(t, kvstore.NewApplication())
}
func TestBaseApp(t *testing.T) {
fmt.Println("### Testing BaseApp")
testStream(t, types.NewBaseApplication())
}
func TestGRPC(t *testing.T) {
fmt.Println("### Testing GRPC")
testGRPCSync(t, types.NewGRPCApplication(types.NewBaseApplication()))
}
func testStream(t *testing.T, app types.Application) {
const numDeliverTxs = 20000
socketFile := fmt.Sprintf("test-%08x.sock", rand.Int31n(1<<30))
defer os.Remove(socketFile)
socket := fmt.Sprintf("unix://%v", socketFile)
// Start the listener
server := abciserver.NewSocketServer(socket, app)
server.SetLogger(log.TestingLogger().With("module", "abci-server"))
err := server.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := server.Stop(); err != nil {
t.Error(err)
}
})
// Connect to the socket
client := abcicli.NewSocketClient(socket, false)
client.SetLogger(log.TestingLogger().With("module", "abci-client"))
err = client.Start()
require.NoError(t, err)
t.Cleanup(func() {
if err := client.Stop(); err != nil {
t.Error(err)
}
})
done := make(chan struct{})
counter := 0
client.SetResponseCallback(func(req *types.Request, res *types.Response) {
// Process response
switch r := res.Value.(type) {
case *types.Response_DeliverTx:
counter++
if r.DeliverTx.Code != code.CodeTypeOK {
t.Error("DeliverTx failed with ret_code", r.DeliverTx.Code)
}
if counter > numDeliverTxs {
t.Fatalf("Too many DeliverTx responses. Got %d, expected %d", counter, numDeliverTxs)
}
if counter == numDeliverTxs {
go func() {
time.Sleep(time.Second * 1) // Wait for a bit to allow counter overflow
close(done)
}()
return
}
case *types.Response_Flush:
// ignore
default:
t.Error("Unexpected response type", reflect.TypeOf(res.Value))
}
})
ctx := context.Background()
// Write requests
for counter := 0; counter < numDeliverTxs; counter++ {
// Send request
_, err = client.DeliverTxAsync(ctx, types.RequestDeliverTx{Tx: []byte("test")})
require.NoError(t, err)
// Sometimes send flush messages
if counter%128 == 0 {
err = client.FlushSync(context.Background())
require.NoError(t, err)
}
}
// Send final flush message
_, err = client.FlushAsync(ctx)
require.NoError(t, err)
<-done
}
//-------------------------
// test grpc
func dialerFunc(ctx context.Context, addr string) (net.Conn, error) {
return tmnet.Connect(addr)
}
func testGRPCSync(t *testing.T, app types.ABCIApplicationServer) {
numDeliverTxs := 2000
socketFile := fmt.Sprintf("test-%08x.sock", rand.Int31n(1<<30))
defer os.Remove(socketFile)
socket := fmt.Sprintf("unix://%v", socketFile)
// Start the listener
server := abciserver.NewGRPCServer(socket, app)
server.SetLogger(log.TestingLogger().With("module", "abci-server"))
if err := server.Start(); err != nil {
t.Fatalf("Error starting GRPC server: %v", err.Error())
}
t.Cleanup(func() {
if err := server.Stop(); err != nil {
t.Error(err)
}
})
// Connect to the socket
conn, err := grpc.Dial(socket, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
if err != nil {
t.Fatalf("Error dialing GRPC server: %v", err.Error())
}
t.Cleanup(func() {
if err := conn.Close(); err != nil {
t.Error(err)
}
})
client := types.NewABCIApplicationClient(conn)
// Write requests
for counter := 0; counter < numDeliverTxs; counter++ {
// Send request
response, err := client.DeliverTx(context.Background(), &types.RequestDeliverTx{Tx: []byte("test")})
if err != nil {
t.Fatalf("Error in GRPC DeliverTx: %v", err.Error())
}
counter++
if response.Code != code.CodeTypeOK {
t.Error("DeliverTx failed with ret_code", response.Code)
}
if counter > numDeliverTxs {
t.Fatal("Too many DeliverTx responses")
}
t.Log("response", counter)
if counter == numDeliverTxs {
go func() {
time.Sleep(time.Second * 1) // Wait for a bit to allow counter overflow
}()
}
}
}