diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 18ee003d9..91dad91ba 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -17,15 +17,13 @@ import ( "github.com/tendermint/tendermint/libs/timer" ) -const reqQueueSize = 256 // TODO make configurable -// const maxResponseSize = 1048576 // 1MB TODO make configurable -const flushThrottleMS = 20 // Don't wait longer than... +const ( + reqQueueSize = 256 // TODO make configurable + flushThrottleMS = 20 // Don't wait longer than... +) -var _ Client = (*socketClient)(nil) - -// This is goroutine-safe, but users should beware that -// the application in general is not meant to be interfaced -// with concurrent callers. +// This is goroutine-safe, but users should beware that the application in +// general is not meant to be interfaced with concurrent callers. type socketClient struct { service.BaseService @@ -40,9 +38,13 @@ type socketClient struct { err error reqSent *list.List // list of requests sent, waiting for response resCb func(*types.Request, *types.Response) // called on all requests, if set. - } +var _ Client = (*socketClient)(nil) + +// NewSocketClient creates a new socket client, which connects to a given +// address. If mustConnect is true, the client will return an error upon start +// if it fails to connect. func NewSocketClient(addr string, mustConnect bool) Client { cli := &socketClient{ reqQueue: make(chan *ReqRes, reqQueueSize), @@ -57,19 +59,24 @@ func NewSocketClient(addr string, mustConnect bool) Client { return cli } +// OnStart implements Service by connecting to the server and spawning reading +// and writing goroutines. func (cli *socketClient) OnStart() error { - var err error - var conn net.Conn -RETRY_LOOP: + var ( + err error + conn net.Conn + ) + for { conn, err = tmnet.Connect(cli.addr) if err != nil { if cli.mustConnect { return err } - cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying...", cli.addr), "err", err) + cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying after %vs...", + cli.addr, dialRetryIntervalSeconds), "err", err) time.Sleep(time.Second * dialRetryIntervalSeconds) - continue RETRY_LOOP + continue } cli.conn = conn @@ -80,39 +87,26 @@ RETRY_LOOP: } } +// OnStop implements Service by closing connection and flushing all queues. func (cli *socketClient) OnStop() { if cli.conn != nil { cli.conn.Close() } - cli.mtx.Lock() - defer cli.mtx.Unlock() cli.flushQueue() + cli.flushTimer.Stop() } -// Stop the client and set the error -func (cli *socketClient) StopForError(err error) { - if !cli.IsRunning() { - return - } - - cli.mtx.Lock() - if cli.err == nil { - cli.err = err - } - cli.mtx.Unlock() - - cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error())) - cli.Stop() -} - +// Error returns an error if the client was stopped abruptly. func (cli *socketClient) Error() error { cli.mtx.Lock() defer cli.mtx.Unlock() return cli.err } -// Set listener for all responses +// SetResponseCallback sets a callback, which will be executed for each +// non-error & non-empty response from the server. +// // NOTE: callback may get internally generated flush responses. func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() @@ -123,11 +117,28 @@ func (cli *socketClient) SetResponseCallback(resCb Callback) { //---------------------------------------- func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { - w := bufio.NewWriter(conn) for { select { - case <-cli.flushTimer.Ch: + case reqres := <-cli.reqQueue: + // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) + + cli.willSendReq(reqres) + err := types.WriteMessage(reqres.Request, w) + if err != nil { + cli.stopForError(fmt.Errorf("write to buffer: %w", err)) + return + } + + // If it's a flush request, flush the current buffer. + if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { + err = w.Flush() + if err != nil { + cli.stopForError(fmt.Errorf("flush buffer: %w", err)) + return + } + } + case <-cli.flushTimer.Ch: // flush queue select { case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): default: @@ -135,45 +146,31 @@ func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { } case <-cli.Quit(): return - case reqres := <-cli.reqQueue: - cli.willSendReq(reqres) - err := types.WriteMessage(reqres.Request, w) - if err != nil { - cli.StopForError(fmt.Errorf("error writing msg: %v", err)) - return - } - // cli.Logger.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) - if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { - err = w.Flush() - if err != nil { - cli.StopForError(fmt.Errorf("error flushing writer: %v", err)) - return - } - } } } } func (cli *socketClient) recvResponseRoutine(conn io.Reader) { - - r := bufio.NewReader(conn) // Buffer reads + r := bufio.NewReader(conn) for { var res = &types.Response{} err := types.ReadMessage(r, res) if err != nil { - cli.StopForError(err) + cli.stopForError(fmt.Errorf("read message: %w", err)) return } + + // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) + switch r := res.Value.(type) { - case *types.Response_Exception: + case *types.Response_Exception: // app responded with error // XXX After setting cli.err, release waiters (e.g. reqres.Done()) - cli.StopForError(errors.New(r.Exception.Error)) + cli.stopForError(errors.New(r.Exception.Error)) return default: - // cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) err := cli.didRecvResponse(res) if err != nil { - cli.StopForError(err) + cli.stopForError(err) return } } @@ -190,20 +187,21 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { cli.mtx.Lock() defer cli.mtx.Unlock() - // Get the first ReqRes + // Get the first ReqRes. next := cli.reqSent.Front() if next == nil { - return fmt.Errorf("unexpected result type %v when nothing expected", reflect.TypeOf(res.Value)) + return fmt.Errorf("unexpected %v when nothing expected", reflect.TypeOf(res.Value)) } + reqres := next.Value.(*ReqRes) if !resMatchesReq(reqres.Request, res) { - return fmt.Errorf("unexpected result type %v when response to %v expected", + return fmt.Errorf("unexpected %v when response to %v expected", reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value)) } - reqres.Response = res // Set response - reqres.Done() // Release waiters - cli.reqSent.Remove(next) // Pop first item from linked list + reqres.Response = res + reqres.Done() // release waiters + cli.reqSent.Remove(next) // pop first item from linked list // Notify client listener if set (global callback). if cli.resCb != nil { @@ -211,8 +209,9 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { } // Notify reqRes listener if set (request specific callback). - // NOTE: it is possible this callback isn't set on the reqres object. - // at this point, in which case it will be called after, when it is set. + // + // NOTE: It is possible this callback isn't set on the reqres object. At this + // point, in which case it will be called after, when it is set. if cb := reqres.GetCallback(); cb != nil { cb(res) } @@ -438,6 +437,9 @@ func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { } func (cli *socketClient) flushQueue() { + cli.mtx.Lock() + defer cli.mtx.Unlock() + // mark all in-flight messages as resolved (they will get cli.Error()) for req := cli.reqSent.Front(); req != nil; req = req.Next() { reqres := req.Value.(*ReqRes) @@ -485,3 +487,18 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { } return ok } + +func (cli *socketClient) stopForError(err error) { + if !cli.IsRunning() { + return + } + + cli.mtx.Lock() + if cli.err == nil { + cli.err = err + } + cli.mtx.Unlock() + + cli.Logger.Error(fmt.Sprintf("Stopping abci.socketClient for error: %v", err.Error())) + cli.Stop() +} diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index f85eca8ae..90b894b71 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -1,7 +1,6 @@ package abcicli_test import ( - "errors" "fmt" "testing" "time" @@ -16,29 +15,6 @@ import ( "github.com/tendermint/tendermint/libs/service" ) -type errorStopper interface { - StopForError(error) -} - -func TestSocketClientStopForErrorDeadlock(t *testing.T) { - c := abcicli.NewSocketClient(":80", false).(errorStopper) - err := errors.New("foo-tendermint") - - // See Issue https://github.com/tendermint/abci/issues/114 - doneChan := make(chan bool) - go func() { - defer close(doneChan) - c.StopForError(err) - c.StopForError(err) - }() - - select { - case <-doneChan: - case <-time.After(time.Second * 4): - t.Fatalf("Test took too long, potential deadlock still exists") - } -} - func TestProperSyncCalls(t *testing.T) { app := slowApp{} diff --git a/libs/bits/bit_array.go b/libs/bits/bit_array.go index b0e4ecae9..9d6901460 100644 --- a/libs/bits/bit_array.go +++ b/libs/bits/bit_array.go @@ -355,14 +355,12 @@ func (bA *BitArray) Update(o *BitArray) { if bA == nil || o == nil { return } + bA.mtx.Lock() o.mtx.Lock() - defer func() { - bA.mtx.Unlock() - o.mtx.Unlock() - }() - copy(bA.Elems, o.Elems) + o.mtx.Unlock() + bA.mtx.Unlock() } // MarshalJSON implements json.Marshaler interface by marshaling bit array