Files
tendermint/abci/server/socket_server.go
Sam Kleinman 3e2d5db289 abci: remove lock protecting calls to the application interface (#7984)
Closes #7073

As part of the 0.36 cycle we've discussed and decided to remove the mutex in tendermint that protects the ABCI application. First, applications should be able to be responsible for their own concurrency control, and can make more fine-grained decisions about concurrent use than tendermint ever could. Second, I've observed in recent weeks as we've been making this change that the mutex wasn't applied particularly consistently in many cases (e.g. multiple "local" connections to the application had multiple locks, etc.) so this will give more consistent experiences across ABCI execution environments, and simplifies the tendermint ABCI handling code.
2022-02-24 19:53:04 +00:00

261 lines
6.5 KiB
Go

package server
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
"runtime"
"sync"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/libs/service"
)
// var maxNumberConnections = 2
type SocketServer struct {
service.BaseService
logger log.Logger
proto string
addr string
listener net.Listener
connsMtx sync.Mutex
connsClose map[int]func()
nextConnID int
app types.Application
}
func NewSocketServer(logger log.Logger, protoAddr string, app types.Application) service.Service {
proto, addr := tmnet.ProtocolAndAddress(protoAddr)
s := &SocketServer{
logger: logger,
proto: proto,
addr: addr,
listener: nil,
app: app,
connsClose: make(map[int]func()),
}
s.BaseService = *service.NewBaseService(logger, "ABCIServer", s)
return s
}
func (s *SocketServer) OnStart(ctx context.Context) error {
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}
s.listener = ln
go s.acceptConnectionsRoutine(ctx)
return nil
}
func (s *SocketServer) OnStop() {
if err := s.listener.Close(); err != nil {
s.logger.Error("error closing listener", "err", err)
}
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
for _, closer := range s.connsClose {
closer()
}
}
func (s *SocketServer) addConn(closer func()) int {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
connID := s.nextConnID
s.nextConnID++
s.connsClose[connID] = closer
return connID
}
// deletes conn even if close errs
func (s *SocketServer) rmConn(connID int) {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
if closer, ok := s.connsClose[connID]; ok {
closer()
delete(s.connsClose, connID)
}
}
func (s *SocketServer) acceptConnectionsRoutine(ctx context.Context) {
for {
if ctx.Err() != nil {
return
}
// Accept a connection
s.logger.Info("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
s.logger.Error("Failed to accept connection", "err", err)
continue
}
cctx, ccancel := context.WithCancel(ctx)
connID := s.addConn(ccancel)
s.logger.Info("Accepted a new connection", "id", connID)
responses := make(chan *types.Response, 1000) // A channel to buffer responses
once := &sync.Once{}
closer := func(err error) {
ccancel()
once.Do(func() {
if cerr := conn.Close(); err != nil {
s.logger.Error("error closing connection",
"id", connID,
"close_err", cerr,
"err", err)
}
s.rmConn(connID)
switch {
case errors.Is(err, context.Canceled):
s.logger.Error("Connection terminated",
"id", connID,
"err", err)
case errors.Is(err, context.DeadlineExceeded):
s.logger.Error("Connection encountered timeout",
"id", connID,
"err", err)
case errors.Is(err, io.EOF):
s.logger.Error("Connection was closed by client",
"id", connID)
case err != nil:
s.logger.Error("Connection error",
"id", connID,
"err", err)
default:
s.logger.Error("Connection was closed",
"id", connID)
}
})
}
// Read requests from conn and deal with them
go s.handleRequests(cctx, closer, conn, responses)
// Pull responses from 'responses' and write them to conn.
go s.handleResponses(cctx, closer, conn, responses)
}
}
// Read requests from conn and deal with them
func (s *SocketServer) handleRequests(
ctx context.Context,
closer func(error),
conn io.Reader,
responses chan<- *types.Response,
) {
var bufReader = bufio.NewReader(conn)
defer func() {
// make sure to recover from any app-related panics to allow proper socket cleanup
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
closer(fmt.Errorf("recovered from panic: %v\n%s", r, buf))
}
}()
for {
req := &types.Request{}
if err := types.ReadMessage(bufReader, req); err != nil {
closer(fmt.Errorf("error reading message: %w", err))
return
}
resp := s.processRequest(req)
select {
case <-ctx.Done():
closer(ctx.Err())
return
case responses <- resp:
}
}
}
func (s *SocketServer) processRequest(req *types.Request) *types.Response {
switch r := req.Value.(type) {
case *types.Request_Echo:
return types.ToResponseEcho(r.Echo.Message)
case *types.Request_Flush:
return types.ToResponseFlush()
case *types.Request_Info:
return types.ToResponseInfo(s.app.Info(*r.Info))
case *types.Request_CheckTx:
return types.ToResponseCheckTx(s.app.CheckTx(*r.CheckTx))
case *types.Request_Commit:
return types.ToResponseCommit(s.app.Commit())
case *types.Request_Query:
return types.ToResponseQuery(s.app.Query(*r.Query))
case *types.Request_InitChain:
return types.ToResponseInitChain(s.app.InitChain(*r.InitChain))
case *types.Request_ListSnapshots:
return types.ToResponseListSnapshots(s.app.ListSnapshots(*r.ListSnapshots))
case *types.Request_OfferSnapshot:
return types.ToResponseOfferSnapshot(s.app.OfferSnapshot(*r.OfferSnapshot))
case *types.Request_PrepareProposal:
return types.ToResponsePrepareProposal(s.app.PrepareProposal(*r.PrepareProposal))
case *types.Request_ProcessProposal:
return types.ToResponseProcessProposal(s.app.ProcessProposal(*r.ProcessProposal))
case *types.Request_LoadSnapshotChunk:
return types.ToResponseLoadSnapshotChunk(s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk))
case *types.Request_ApplySnapshotChunk:
return types.ToResponseApplySnapshotChunk(s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk))
case *types.Request_ExtendVote:
return types.ToResponseExtendVote(s.app.ExtendVote(*r.ExtendVote))
case *types.Request_VerifyVoteExtension:
return types.ToResponseVerifyVoteExtension(s.app.VerifyVoteExtension(*r.VerifyVoteExtension))
case *types.Request_FinalizeBlock:
return types.ToResponseFinalizeBlock(s.app.FinalizeBlock(*r.FinalizeBlock))
default:
return types.ToResponseException("Unknown request")
}
}
// Pull responses from 'responses' and write them to conn.
func (s *SocketServer) handleResponses(
ctx context.Context,
closer func(error),
conn io.Writer,
responses <-chan *types.Response,
) {
bw := bufio.NewWriter(conn)
for {
select {
case <-ctx.Done():
closer(ctx.Err())
return
case res := <-responses:
if err := types.WriteMessage(res, bw); err != nil {
closer(fmt.Errorf("error writing message: %w", err))
return
}
if err := bw.Flush(); err != nil {
closer(fmt.Errorf("error writing message: %w", err))
return
}
}
}
}