abci: remove lock protecting calls to the application interface (#7984)

Closes #7073

As part of the 0.36 cycle we've discussed and decided to remove the mutex in tendermint that protects the ABCI application. First, applications should be able to be responsible for their own concurrency control, and can make more fine-grained decisions about concurrent use than tendermint ever could. Second, I've observed in recent weeks as we've been making this change that the mutex wasn't applied particularly consistently in many cases (e.g. multiple "local" connections to the application had multiple locks, etc.) so this will give more consistent experiences across ABCI execution environments, and simplifies the tendermint ABCI handling code.
This commit is contained in:
Sam Kleinman
2022-02-24 14:53:04 -05:00
committed by GitHub
parent f795d3f360
commit 3e2d5db289
6 changed files with 131 additions and 184 deletions

View File

@@ -91,14 +91,18 @@ RETRY_LOOP:
}
func (cli *grpcClient) OnStop() {
cli.mtx.Lock()
defer cli.mtx.Unlock()
if cli.conn != nil {
cli.conn.Close()
cli.err = cli.conn.Close()
}
}
func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}

View File

@@ -2,7 +2,6 @@ package abciclient
import (
"context"
"sync"
types "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
@@ -15,8 +14,6 @@ import (
// RPC endpoint), but defers are used everywhere for the sake of consistency.
type localClient struct {
service.BaseService
mtx sync.Mutex
types.Application
}
@@ -47,113 +44,71 @@ func (app *localClient) Echo(_ context.Context, msg string) (*types.ResponseEcho
}
func (app *localClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Info(req)
return &res, nil
}
func (app *localClient) CheckTx(_ context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.CheckTx(req)
return &res, nil
}
func (app *localClient) Query(_ context.Context, req types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Query(req)
return &res, nil
}
func (app *localClient) Commit(ctx context.Context) (*types.ResponseCommit, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Commit()
return &res, nil
}
func (app *localClient) InitChain(_ context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.InitChain(req)
return &res, nil
}
func (app *localClient) ListSnapshots(_ context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.ListSnapshots(req)
return &res, nil
}
func (app *localClient) OfferSnapshot(_ context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.OfferSnapshot(req)
return &res, nil
}
func (app *localClient) LoadSnapshotChunk(_ context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.LoadSnapshotChunk(req)
return &res, nil
}
func (app *localClient) ApplySnapshotChunk(_ context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.ApplySnapshotChunk(req)
return &res, nil
}
func (app *localClient) PrepareProposal(_ context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.PrepareProposal(req)
return &res, nil
}
func (app *localClient) ProcessProposal(_ context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.ProcessProposal(req)
return &res, nil
}
func (app *localClient) ExtendVote(_ context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.ExtendVote(req)
return &res, nil
}
func (app *localClient) VerifyVoteExtension(_ context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.VerifyVoteExtension(req)
return &res, nil
}
func (app *localClient) FinalizeBlock(_ context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.FinalizeBlock(req)
return &res, nil
}

View File

@@ -16,10 +16,9 @@ type GRPCServer struct {
service.BaseService
logger log.Logger
proto string
addr string
listener net.Listener
server *grpc.Server
proto string
addr string
server *grpc.Server
app types.ABCIApplicationServer
}
@@ -28,11 +27,10 @@ type GRPCServer struct {
func NewGRPCServer(logger log.Logger, protoAddr string, app types.ABCIApplicationServer) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &GRPCServer{
logger: logger,
proto: proto,
addr: addr,
listener: nil,
app: app,
logger: logger,
proto: proto,
addr: addr,
app: app,
}
s.BaseService = *service.NewBaseService(logger, "ABCIServer", s)
return s
@@ -40,13 +38,11 @@ func NewGRPCServer(logger log.Logger, protoAddr string, app types.ABCIApplicatio
// OnStart starts the gRPC service.
func (s *GRPCServer) OnStart(ctx context.Context) error {
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}
s.listener = ln
s.server = grpc.NewServer()
types.RegisterABCIApplicationServer(s.server, s.app)
@@ -57,7 +53,7 @@ func (s *GRPCServer) OnStart(ctx context.Context) error {
s.server.GracefulStop()
}()
if err := s.server.Serve(s.listener); err != nil {
if err := s.server.Serve(ln); err != nil {
s.logger.Error("error serving gRPC server", "err", err)
}
}()
@@ -65,6 +61,4 @@ func (s *GRPCServer) OnStart(ctx context.Context) error {
}
// OnStop stops the gRPC server.
func (s *GRPCServer) OnStop() {
s.server.Stop()
}
func (s *GRPCServer) OnStop() { s.server.Stop() }

View File

@@ -3,6 +3,7 @@ package server
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
@@ -26,22 +27,21 @@ type SocketServer struct {
listener net.Listener
connsMtx sync.Mutex
conns map[int]net.Conn
connsClose map[int]func()
nextConnID int
appMtx sync.Mutex
app types.Application
app types.Application
}
func NewSocketServer(logger log.Logger, protoAddr string, app types.Application) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &SocketServer{
logger: logger,
proto: proto,
addr: addr,
listener: nil,
app: app,
conns: make(map[int]net.Conn),
logger: logger,
proto: proto,
addr: addr,
listener: nil,
app: app,
connsClose: make(map[int]func()),
}
s.BaseService = *service.NewBaseService(logger, "ABCIServer", s)
return s
@@ -67,44 +67,35 @@ func (s *SocketServer) OnStop() {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
for id, conn := range s.conns {
delete(s.conns, id)
if err := conn.Close(); err != nil {
s.logger.Error("error closing connection", "id", id, "conn", conn, "err", err)
}
for _, closer := range s.connsClose {
closer()
}
}
func (s *SocketServer) addConn(conn net.Conn) int {
func (s *SocketServer) addConn(closer func()) int {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
connID := s.nextConnID
s.nextConnID++
s.conns[connID] = conn
s.connsClose[connID] = closer
return connID
}
// deletes conn even if close errs
func (s *SocketServer) rmConn(connID int) error {
func (s *SocketServer) rmConn(connID int) {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
conn, ok := s.conns[connID]
if !ok {
return fmt.Errorf("connection %d does not exist", connID)
if closer, ok := s.connsClose[connID]; ok {
closer()
delete(s.connsClose, connID)
}
delete(s.conns, connID)
return conn.Close()
}
func (s *SocketServer) acceptConnectionsRoutine(ctx context.Context) {
for {
if ctx.Err() != nil {
return
}
// Accept a connection
@@ -118,149 +109,134 @@ func (s *SocketServer) acceptConnectionsRoutine(ctx context.Context) {
continue
}
s.logger.Info("Accepted a new connection")
cctx, ccancel := context.WithCancel(ctx)
connID := s.addConn(ccancel)
connID := s.addConn(conn)
s.logger.Info("Accepted a new connection", "id", connID)
closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses
once := &sync.Once{}
closer := func(err error) {
ccancel()
once.Do(func() {
if cerr := conn.Close(); err != nil {
s.logger.Error("error closing connection",
"id", connID,
"close_err", cerr,
"err", err)
}
s.rmConn(connID)
switch {
case errors.Is(err, context.Canceled):
s.logger.Error("Connection terminated",
"id", connID,
"err", err)
case errors.Is(err, context.DeadlineExceeded):
s.logger.Error("Connection encountered timeout",
"id", connID,
"err", err)
case errors.Is(err, io.EOF):
s.logger.Error("Connection was closed by client",
"id", connID)
case err != nil:
s.logger.Error("Connection error",
"id", connID,
"err", err)
default:
s.logger.Error("Connection was closed",
"id", connID)
}
})
}
// Read requests from conn and deal with them
go s.handleRequests(ctx, closeConn, conn, responses)
go s.handleRequests(cctx, closer, conn, responses)
// Pull responses from 'responses' and write them to conn.
go s.handleResponses(ctx, closeConn, conn, responses)
// Wait until signal to close connection
go s.waitForClose(ctx, closeConn, connID)
}
}
func (s *SocketServer) waitForClose(ctx context.Context, closeConn chan error, connID int) {
defer func() {
// Close the connection
if err := s.rmConn(connID); err != nil {
s.logger.Error("error closing connection", "err", err)
}
}()
select {
case <-ctx.Done():
return
case err := <-closeConn:
switch {
case err == io.EOF:
s.logger.Error("Connection was closed by client")
case err != nil:
s.logger.Error("Connection error", "err", err)
default:
// never happens
s.logger.Error("Connection was closed")
}
go s.handleResponses(cctx, closer, conn, responses)
}
}
// Read requests from conn and deal with them
func (s *SocketServer) handleRequests(
ctx context.Context,
closeConn chan error,
closer func(error),
conn io.Reader,
responses chan<- *types.Response,
) {
var count int
var bufReader = bufio.NewReader(conn)
defer func() {
// make sure to recover from any app-related panics to allow proper socket cleanup
r := recover()
if r != nil {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
err := fmt.Errorf("recovered from panic: %v\n%s", r, buf)
closeConn <- err
s.appMtx.Unlock()
closer(fmt.Errorf("recovered from panic: %v\n%s", r, buf))
}
}()
for {
if ctx.Err() != nil {
req := &types.Request{}
if err := types.ReadMessage(bufReader, req); err != nil {
closer(fmt.Errorf("error reading message: %w", err))
return
}
var req = &types.Request{}
err := types.ReadMessage(bufReader, req)
if err != nil {
if err == io.EOF {
closeConn <- err
} else {
closeConn <- fmt.Errorf("error reading message: %w", err)
}
resp := s.processRequest(req)
select {
case <-ctx.Done():
closer(ctx.Err())
return
case responses <- resp:
}
s.appMtx.Lock()
count++
s.handleRequest(req, responses)
s.appMtx.Unlock()
}
}
func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
func (s *SocketServer) processRequest(req *types.Request) *types.Response {
switch r := req.Value.(type) {
case *types.Request_Echo:
responses <- types.ToResponseEcho(r.Echo.Message)
return types.ToResponseEcho(r.Echo.Message)
case *types.Request_Flush:
responses <- types.ToResponseFlush()
return types.ToResponseFlush()
case *types.Request_Info:
res := s.app.Info(*r.Info)
responses <- types.ToResponseInfo(res)
return types.ToResponseInfo(s.app.Info(*r.Info))
case *types.Request_CheckTx:
res := s.app.CheckTx(*r.CheckTx)
responses <- types.ToResponseCheckTx(res)
return types.ToResponseCheckTx(s.app.CheckTx(*r.CheckTx))
case *types.Request_Commit:
res := s.app.Commit()
responses <- types.ToResponseCommit(res)
return types.ToResponseCommit(s.app.Commit())
case *types.Request_Query:
res := s.app.Query(*r.Query)
responses <- types.ToResponseQuery(res)
return types.ToResponseQuery(s.app.Query(*r.Query))
case *types.Request_InitChain:
res := s.app.InitChain(*r.InitChain)
responses <- types.ToResponseInitChain(res)
return types.ToResponseInitChain(s.app.InitChain(*r.InitChain))
case *types.Request_ListSnapshots:
res := s.app.ListSnapshots(*r.ListSnapshots)
responses <- types.ToResponseListSnapshots(res)
return types.ToResponseListSnapshots(s.app.ListSnapshots(*r.ListSnapshots))
case *types.Request_OfferSnapshot:
res := s.app.OfferSnapshot(*r.OfferSnapshot)
responses <- types.ToResponseOfferSnapshot(res)
return types.ToResponseOfferSnapshot(s.app.OfferSnapshot(*r.OfferSnapshot))
case *types.Request_PrepareProposal:
res := s.app.PrepareProposal(*r.PrepareProposal)
responses <- types.ToResponsePrepareProposal(res)
return types.ToResponsePrepareProposal(s.app.PrepareProposal(*r.PrepareProposal))
case *types.Request_ProcessProposal:
res := s.app.ProcessProposal(*r.ProcessProposal)
responses <- types.ToResponseProcessProposal(res)
return types.ToResponseProcessProposal(s.app.ProcessProposal(*r.ProcessProposal))
case *types.Request_LoadSnapshotChunk:
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
responses <- types.ToResponseLoadSnapshotChunk(res)
return types.ToResponseLoadSnapshotChunk(s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk))
case *types.Request_ApplySnapshotChunk:
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
responses <- types.ToResponseApplySnapshotChunk(res)
return types.ToResponseApplySnapshotChunk(s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk))
case *types.Request_ExtendVote:
res := s.app.ExtendVote(*r.ExtendVote)
responses <- types.ToResponseExtendVote(res)
return types.ToResponseExtendVote(s.app.ExtendVote(*r.ExtendVote))
case *types.Request_VerifyVoteExtension:
res := s.app.VerifyVoteExtension(*r.VerifyVoteExtension)
responses <- types.ToResponseVerifyVoteExtension(res)
return types.ToResponseVerifyVoteExtension(s.app.VerifyVoteExtension(*r.VerifyVoteExtension))
case *types.Request_FinalizeBlock:
res := s.app.FinalizeBlock(*r.FinalizeBlock)
responses <- types.ToResponseFinalizeBlock(res)
return types.ToResponseFinalizeBlock(s.app.FinalizeBlock(*r.FinalizeBlock))
default:
responses <- types.ToResponseException("Unknown request")
return types.ToResponseException("Unknown request")
}
}
// Pull responses from 'responses' and write them to conn.
func (s *SocketServer) handleResponses(
ctx context.Context,
closeConn chan error,
closer func(error),
conn io.Writer,
responses <-chan *types.Response,
) {
@@ -268,21 +244,15 @@ func (s *SocketServer) handleResponses(
for {
select {
case <-ctx.Done():
closer(ctx.Err())
return
case res := <-responses:
if err := types.WriteMessage(res, bw); err != nil {
select {
case <-ctx.Done():
case closeConn <- fmt.Errorf("error writing message: %w", err):
}
closer(fmt.Errorf("error writing message: %w", err))
return
}
if err := bw.Flush(); err != nil {
select {
case <-ctx.Done():
case closeConn <- fmt.Errorf("error flushing write buffer: %w", err):
}
closer(fmt.Errorf("error writing message: %w", err))
return
}
}