diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 5fb219e24..524df1ef8 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -39,7 +39,7 @@ jobs: platforms: all - name: Set up Docker Build - uses: docker/setup-buildx-action@v1.6.0 + uses: docker/setup-buildx-action@v1.7.0 - name: Login to DockerHub if: ${{ github.event_name != 'pull_request' }} diff --git a/cmd/priv_val_server/main.go b/cmd/priv_val_server/main.go index e34236acc..901422145 100644 --- a/cmd/priv_val_server/main.go +++ b/cmd/priv_val_server/main.go @@ -113,7 +113,7 @@ func main() { // add prometheus metrics for unary RPC calls opts = append(opts, grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor)) - ss := grpcprivval.NewSignerServer(*chainID, pv, logger) + ss := grpcprivval.NewSignerServer(logger, *chainID, pv) protocol, address := tmnet.ProtocolAndAddress(*addr) diff --git a/go.mod b/go.mod index f03deab1a..ff5b4e11f 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.17 require ( github.com/BurntSushi/toml v1.1.0 github.com/adlio/schema v1.3.0 - github.com/btcsuite/btcd v0.22.0-beta + github.com/btcsuite/btcd v0.22.1 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce github.com/fortytw2/leaktest v1.3.0 github.com/go-kit/kit v0.12.0 @@ -75,7 +75,7 @@ require ( github.com/charithe/durationcheck v0.0.9 // indirect github.com/chavacava/garif v0.0.0-20210405164556-e8a0a408d6af // indirect github.com/containerd/continuity v0.2.1 // indirect - github.com/creachadair/tomledit v0.0.18 + github.com/creachadair/tomledit v0.0.19 github.com/daixiang0/gci v0.3.3 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/denis-tingaikin/go-header v0.4.3 // indirect diff --git a/go.sum b/go.sum index 07dedbb7b..a7b2d7bd8 100644 --- a/go.sum +++ b/go.sum @@ -157,8 +157,10 @@ github.com/breml/bidichk v0.2.2/go.mod h1:zbfeitpevDUGI7V91Uzzuwrn4Vls8MoBMrwtt7 github.com/breml/errchkjson v0.2.3 h1:97eGTmR/w0paL2SwfRPI1jaAZHaH/fXnxWTw2eEIqE0= github.com/breml/errchkjson v0.2.3/go.mod h1:jZEATw/jF69cL1iy7//Yih8yp/mXp2CBoBr9GJwCAsY= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= -github.com/btcsuite/btcd v0.22.0-beta h1:LTDpDKUM5EeOFBPM8IXpinEcmZ6FWfNZbE3lfrfdnWo= -github.com/btcsuite/btcd v0.22.0-beta/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= +github.com/btcsuite/btcd v0.22.1 h1:CnwP9LM/M9xuRrGSCGeMVs9iv09uMqwsVX7EeIpgV2c= +github.com/btcsuite/btcd v0.22.1/go.mod h1:wqgTSL29+50LRkmOVknEdmt8ZojIzhuWvgu/iptuN7Y= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ= @@ -229,8 +231,8 @@ github.com/creachadair/atomicfile v0.2.5 h1:wkOlpsjyJOvJ3Hd8juHKdirJnCSIPacvtY21 github.com/creachadair/atomicfile v0.2.5/go.mod h1:BRq8Une6ckFneYXZQ+kO7p1ZZP3I2fzVzf28JxrIkBc= github.com/creachadair/taskgroup v0.3.2 h1:zlfutDS+5XG40AOxcHDSThxKzns8Tnr9jnr6VqkYlkM= github.com/creachadair/taskgroup v0.3.2/go.mod h1:wieWwecHVzsidg2CsUnFinW1faVN4+kq+TDlRJQ0Wbk= -github.com/creachadair/tomledit v0.0.18 h1:gleeCnEgMPYcaBz5KN/1CN9yycF5n6Q2py7Ahu6yc7A= -github.com/creachadair/tomledit v0.0.18/go.mod h1:gvtfnSZLa+YNQD28vaPq0Nk12bRxEhmUdBzAWn+EGF4= +github.com/creachadair/tomledit v0.0.19 h1:zbpfUtYFYFdpRjwJY9HJlto1iZ4M5YwYB6qqc37F6UM= +github.com/creachadair/tomledit v0.0.19/go.mod h1:gvtfnSZLa+YNQD28vaPq0Nk12bRxEhmUdBzAWn+EGF4= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index b662773f1..47e85a82b 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -28,7 +28,7 @@ eg, L = latency = 0.1s */ const ( - requestIntervalMS = 2 + requestInterval = 2 * time.Millisecond maxTotalRequesters = 600 maxPeerErrBuffer = 1000 maxPendingRequests = maxTotalRequesters @@ -131,27 +131,23 @@ func (*BlockPool) OnStop() {} // spawns requesters as needed func (pool *BlockPool) makeRequestersRoutine(ctx context.Context) { - for { - if !pool.IsRunning() { - break + for pool.IsRunning() { + if ctx.Err() != nil { + return } _, numPending, lenRequesters := pool.GetStatus() - switch { - case numPending >= maxPendingRequests: - // sleep for a bit. - time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers + if numPending >= maxPendingRequests || lenRequesters >= maxTotalRequesters { + // This is preferable to using a timer because the request interval + // is so small. Larger request intervals may necessitate using a + // timer/ticker. + time.Sleep(requestInterval) pool.removeTimedoutPeers() - case lenRequesters >= maxTotalRequesters: - // sleep for a bit. - time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() - default: - // request for more blocks. - pool.makeNextRequester(ctx) + continue } + + // request for more blocks. + pool.makeNextRequester(ctx) } } @@ -650,9 +646,16 @@ OUTER_LOOP: if !bpr.IsRunning() || !bpr.pool.IsRunning() { return } + if ctx.Err() != nil { + return + } + peer = bpr.pool.pickIncrAvailablePeer(bpr.height) if peer == nil { - time.Sleep(requestIntervalMS * time.Millisecond) + // This is preferable to using a timer because the request + // interval is so small. Larger request intervals may + // necessitate using a timer/ticker. + time.Sleep(requestInterval) continue PICK_PEER_LOOP } break PICK_PEER_LOOP diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 779e6faf2..43a5296ed 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -210,7 +210,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -224,7 +224,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case BlockSyncChannel: switch msg := envelope.Message.(type) { case *bcproto.BlockRequest: @@ -258,7 +258,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop } default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) } return err @@ -273,12 +273,12 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann iter := blockSyncCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, blockSyncCh.ID, envelope, blockSyncCh); err != nil { + if err := r.handleMessage(ctx, envelope, blockSyncCh); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } - r.logger.Error("failed to process message", "ch_id", blockSyncCh.ID, "envelope", envelope, "err", err) + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := blockSyncCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index 53e0b540d..eea74b5e1 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -1266,7 +1266,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En // the p2p channel. // // NOTE: We block on consensus state for proposals, block parts, and votes. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, chans channelBundle) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans channelBundle) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -1284,18 +1284,19 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop // and because a large part of the core business logic depends on these // domain types opposed to simply working with the Proto types. protoMsg := new(tmcons.Message) - if err := protoMsg.Wrap(envelope.Message); err != nil { + if err = protoMsg.Wrap(envelope.Message); err != nil { return err } - msgI, err := MsgFromProto(protoMsg) + var msgI Message + msgI, err = MsgFromProto(protoMsg) if err != nil { return err } - r.logger.Debug("received message", "ch_id", chID, "message", msgI, "peer", envelope.From) + r.logger.Debug("received message", "ch_id", envelope.ChannelID, "message", msgI, "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case StateChannel: err = r.handleStateMessage(ctx, envelope, msgI, chans.votSet) case DataChannel: @@ -1305,7 +1306,7 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop case VoteSetBitsChannel: err = r.handleVoteSetBitsMessage(ctx, envelope, msgI) default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) } return err @@ -1320,8 +1321,8 @@ func (r *Reactor) processStateCh(ctx context.Context, chans channelBundle) { iter := chans.state.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, chans.state.ID, envelope, chans); err != nil { - r.logger.Error("failed to process message", "ch_id", chans.state.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := chans.state.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, @@ -1341,8 +1342,8 @@ func (r *Reactor) processDataCh(ctx context.Context, chans channelBundle) { iter := chans.data.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, chans.data.ID, envelope, chans); err != nil { - r.logger.Error("failed to process message", "ch_id", chans.data.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := chans.data.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, @@ -1362,8 +1363,8 @@ func (r *Reactor) processVoteCh(ctx context.Context, chans channelBundle) { iter := chans.vote.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, chans.vote.ID, envelope, chans); err != nil { - r.logger.Error("failed to process message", "ch_id", chans.vote.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope, chans); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := chans.vote.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, @@ -1384,12 +1385,12 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context, chans channelBundle) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, chans.votSet.ID, envelope, chans); err != nil { + if err := r.handleMessage(ctx, envelope, chans); err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return } - r.logger.Error("failed to process message", "ch_id", chans.votSet.ID, "envelope", envelope, "err", err) + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := chans.votSet.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/consensus/reactor_test.go b/internal/consensus/reactor_test.go index 8a9abdf43..c6a8869db 100644 --- a/internal/consensus/reactor_test.go +++ b/internal/consensus/reactor_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/rand" "os" "sync" "testing" @@ -776,8 +777,8 @@ func TestReactorValidatorSetChanges(t *testing.T) { cfg := configSetup(t) - nPeers := 7 - nVals := 4 + nPeers := 4 + nVals := 2 states, _, _, cleanup := randConsensusNetWithPeers( ctx, t, @@ -870,60 +871,27 @@ func TestReactorValidatorSetChanges(t *testing.T) { // it includes the commit for block 4, which should have the updated validator set waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) - updateValidatorPubKey1, err := states[nVals].privValidator.GetPubKey(ctx) - require.NoError(t, err) + for i := 2; i <= 32; i *= 2 { + useState := rand.Intn(nVals) + t.Log(useState) + updateValidatorPubKey1, err := states[useState].privValidator.GetPubKey(ctx) + require.NoError(t, err) - updatePubKey1ABCI, err := encoding.PubKeyToProto(updateValidatorPubKey1) - require.NoError(t, err) + updatePubKey1ABCI, err := encoding.PubKeyToProto(updateValidatorPubKey1) + require.NoError(t, err) - updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) - previousTotalVotingPower := states[nVals].GetRoundState().LastValidators.TotalVotingPower() + previousTotalVotingPower := states[useState].GetRoundState().LastValidators.TotalVotingPower() + updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, int64(i)) - waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1) - waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1) - waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states) - waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) + waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1) + waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, updateValidatorTx1) + waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states) + waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) - require.NotEqualf( - t, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower, - "expected voting power to change (before: %d, after: %d)", - previousTotalVotingPower, states[nVals].GetRoundState().LastValidators.TotalVotingPower(), - ) - - newValidatorPubKey2, err := states[nVals+1].privValidator.GetPubKey(ctx) - require.NoError(t, err) - - newVal2ABCI, err := encoding.PubKeyToProto(newValidatorPubKey2) - require.NoError(t, err) - - newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) - - newValidatorPubKey3, err := states[nVals+2].privValidator.GetPubKey(ctx) - require.NoError(t, err) - - newVal3ABCI, err := encoding.PubKeyToProto(newValidatorPubKey3) - require.NoError(t, err) - - newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) - - waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, newValidatorTx2, newValidatorTx3) - waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, newValidatorTx2, newValidatorTx3) - waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states) - - activeVals[string(newValidatorPubKey2.Address())] = struct{}{} - activeVals[string(newValidatorPubKey3.Address())] = struct{}{} - - waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) - - removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) - removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) - - waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states, removeValidatorTx2, removeValidatorTx3) - waitForAndValidateBlockWithTx(ctx, t, nPeers, activeVals, blocksSubs, states, removeValidatorTx2, removeValidatorTx3) - waitForAndValidateBlock(ctx, t, nPeers, activeVals, blocksSubs, states) - - delete(activeVals, string(newValidatorPubKey2.Address())) - delete(activeVals, string(newValidatorPubKey3.Address())) - - waitForBlockWithUpdatedValsAndValidateIt(ctx, t, nPeers, activeVals, blocksSubs, states) + require.NotEqualf( + t, states[useState].GetRoundState().LastValidators.TotalVotingPower(), previousTotalVotingPower, + "expected voting power to change (before: %d, after: %d)", + previousTotalVotingPower, states[useState].GetRoundState().LastValidators.TotalVotingPower(), + ) + } } diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 4809df32e..1d952d30e 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -133,7 +133,7 @@ func (r *Reactor) handleEvidenceMessage(ctx context.Context, envelope *p2p.Envel // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("panic in processing message: %v", e) @@ -147,15 +147,14 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case EvidenceChannel: err = r.handleEvidenceMessage(ctx, envelope) - default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope) } - return err + return } // processEvidenceCh implements a blocking event loop where we listen for p2p @@ -164,8 +163,8 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel iter := evidenceCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, evidenceCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", evidenceCh.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := evidenceCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 8f83f5006..ae578e70a 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -169,7 +169,7 @@ func (r *Reactor) handleMempoolMessage(ctx context.Context, envelope *p2p.Envelo // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. // It will handle errors and any possible panics gracefully. A caller can handle // any error returned by sending a PeerError on the respective channel. -func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) { +func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (err error) { defer func() { if e := recover(); e != nil { r.observePanic(e) @@ -184,15 +184,14 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop r.logger.Debug("received message", "peer", envelope.From) - switch chID { + switch envelope.ChannelID { case MempoolChannel: err = r.handleMempoolMessage(ctx, envelope) - default: - err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", chID, envelope.Message) + err = fmt.Errorf("unknown channel ID (%d) for envelope (%T)", envelope.ChannelID, envelope.Message) } - return err + return } // processMempoolCh implements a blocking event loop where we listen for p2p @@ -201,8 +200,8 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) iter := mempoolCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() - if err := r.handleMessage(ctx, mempoolCh.ID, envelope); err != nil { - r.logger.Error("failed to process message", "ch_id", mempoolCh.ID, "envelope", envelope, "err", err) + if err := r.handleMessage(ctx, envelope); err != nil { + r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := mempoolCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/p2p/channel.go b/internal/p2p/channel.go index 8e6774612..d3d7d104f 100644 --- a/internal/p2p/channel.go +++ b/internal/p2p/channel.go @@ -59,25 +59,17 @@ type Channel struct { outCh chan<- Envelope // outbound messages (reactors to peers) errCh chan<- PeerError // peer error reporting - messageType proto.Message // the channel's message type, used for unmarshaling - name string + name string } // NewChannel creates a new channel. It is primarily for internal and test // use, reactors should use Router.OpenChannel(). -func NewChannel( - id ChannelID, - messageType proto.Message, - inCh <-chan Envelope, - outCh chan<- Envelope, - errCh chan<- PeerError, -) *Channel { +func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) *Channel { return &Channel{ - ID: id, - messageType: messageType, - inCh: inCh, - outCh: outCh, - errCh: errCh, + ID: id, + inCh: inCh, + outCh: outCh, + errCh: errCh, } } diff --git a/internal/p2p/p2ptest/require.go b/internal/p2p/p2ptest/require.go index f9f3ec40e..885e080d4 100644 --- a/internal/p2p/p2ptest/require.go +++ b/internal/p2p/p2ptest/require.go @@ -123,7 +123,7 @@ func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerE err := channel.SendError(tctx, peerError) switch { case errors.Is(err, context.DeadlineExceeded): - require.Fail(t, "timed out reporting error", "%v on %v", peerError, channel.ID) + require.Fail(t, "timed out reporting error", "%v for %q", peerError, channel.String()) default: require.NoError(t, err, "unexpected error") } diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 1c80763ee..bd4737326 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -193,7 +193,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) { dur, err := r.handlePexMessage(ctx, envelope, pexCh) if err != nil { r.logger.Error("failed to process message", - "ch_id", pexCh.ID, "envelope", envelope, "err", err) + "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) if serr := pexCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, diff --git a/internal/p2p/pex/reactor_test.go b/internal/p2p/pex/reactor_test.go index f2132fbba..ec2f03d83 100644 --- a/internal/p2p/pex/reactor_test.go +++ b/internal/p2p/pex/reactor_test.go @@ -289,7 +289,6 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor { pexErrCh := make(chan p2p.PeerError, chBuf) pexCh := p2p.NewChannel( p2p.ChannelID(pex.PexChannel), - new(p2pproto.PexMessage), pexInCh, pexOutCh, pexErrCh, diff --git a/internal/p2p/router.go b/internal/p2p/router.go index df096dbb6..459be7975 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -262,7 +262,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C queue := r.queueFactory(chDesc.RecvBufferCapacity) outCh := make(chan Envelope, chDesc.RecvBufferCapacity) errCh := make(chan PeerError, chDesc.RecvBufferCapacity) - channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh) + channel := NewChannel(id, queue.dequeue(), outCh, errCh) channel.name = chDesc.Name var wrapper Wrapper diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index 65c517be4..8ec074bd1 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -30,7 +30,7 @@ func testChannel(size int) (*channelInternal, *p2p.Channel) { Out: make(chan p2p.Envelope, size), Error: make(chan p2p.PeerError, size), } - return in, p2p.NewChannel(0, nil, in.In, in.Out, in.Error) + return in, p2p.NewChannel(0, in.In, in.Out, in.Error) } func TestDispatcherBasic(t *testing.T) { diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index fb2749a02..f4d72d017 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -305,7 +305,12 @@ func (r *Reactor) OnStart(ctx context.Context) error { return nil } - go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh) + go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{ + SnapshotChannel: snapshotCh, + ChunkChannel: chunkCh, + LightBlockChannel: blockCh, + ParamsChannel: paramsCh, + }) go r.processPeerUpdates(ctx, r.peerEvents(ctx)) if r.needsStateSync { @@ -661,7 +666,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel "failed to add snapshot", "height", msg.Height, "format", msg.Format, - "channel", snapshotCh.ID, + "channel", envelope.ChannelID, "err", err, ) return nil @@ -907,15 +912,14 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha // encountered during message execution will result in a PeerError being sent on // the respective channel. When the reactor is stopped, we will catch the signal // and close the p2p Channel gracefully. -func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) { +func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) { // make sure that the iterator gets cleaned up in case of error ctx, cancel := context.WithCancel(ctx) defer cancel() - chanTable := make(map[p2p.ChannelID]*p2p.Channel, len(chs)) - for idx := range chs { - ch := chs[idx] - chanTable[ch.ID] = ch + chs := make([]*p2p.Channel, 0, len(chanTable)) + for key := range chanTable { + chs = append(chs, chanTable[key]) } iter := p2p.MergedChannelIterator(ctx, chs...) diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index c6b2c2d2b..55a9fcf8c 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -102,7 +102,6 @@ func setup( rts.snapshotChannel = p2p.NewChannel( SnapshotChannel, - new(ssproto.Message), rts.snapshotInCh, rts.snapshotOutCh, rts.snapshotPeerErrCh, @@ -110,7 +109,6 @@ func setup( rts.chunkChannel = p2p.NewChannel( ChunkChannel, - new(ssproto.Message), rts.chunkInCh, rts.chunkOutCh, rts.chunkPeerErrCh, @@ -118,7 +116,6 @@ func setup( rts.blockChannel = p2p.NewChannel( LightBlockChannel, - new(ssproto.Message), rts.blockInCh, rts.blockOutCh, rts.blockPeerErrCh, @@ -126,7 +123,6 @@ func setup( rts.paramsChannel = p2p.NewChannel( ParamsChannel, - new(ssproto.Message), rts.paramsInCh, rts.paramsOutCh, rts.paramsPeerErrCh, diff --git a/privval/grpc/client_test.go b/privval/grpc/client_test.go index 0b1056d03..a81f6ce07 100644 --- a/privval/grpc/client_test.go +++ b/privval/grpc/client_test.go @@ -29,7 +29,7 @@ func dialer(t *testing.T, pv types.PrivValidator, logger log.Logger) (*grpc.Serv server := grpc.NewServer() - s := tmgrpc.NewSignerServer(chainID, pv, logger) + s := tmgrpc.NewSignerServer(logger, chainID, pv) privvalproto.RegisterPrivValidatorAPIServer(server, s) diff --git a/privval/grpc/server.go b/privval/grpc/server.go index 13e0c9073..40af82479 100644 --- a/privval/grpc/server.go +++ b/privval/grpc/server.go @@ -21,11 +21,9 @@ type SignerServer struct { privVal types.PrivValidator } -func NewSignerServer(chainID string, - privVal types.PrivValidator, log log.Logger) *SignerServer { - +func NewSignerServer(logger log.Logger, chainID string, privVal types.PrivValidator) *SignerServer { return &SignerServer{ - logger: log, + logger: logger, chainID: chainID, privVal: privVal, } @@ -56,8 +54,7 @@ func (ss *SignerServer) GetPubKey(ctx context.Context, req *privvalproto.PubKeyR // SignVote receives a vote sign requests, attempts to sign it // returns SignedVoteResponse on success and error on failure -func (ss *SignerServer) SignVote(ctx context.Context, req *privvalproto.SignVoteRequest) ( - *privvalproto.SignedVoteResponse, error) { +func (ss *SignerServer) SignVote(ctx context.Context, req *privvalproto.SignVoteRequest) (*privvalproto.SignedVoteResponse, error) { vote := req.Vote err := ss.privVal.SignVote(ctx, req.ChainId, vote) @@ -72,8 +69,7 @@ func (ss *SignerServer) SignVote(ctx context.Context, req *privvalproto.SignVote // SignProposal receives a proposal sign requests, attempts to sign it // returns SignedProposalResponse on success and error on failure -func (ss *SignerServer) SignProposal(ctx context.Context, req *privvalproto.SignProposalRequest) ( - *privvalproto.SignedProposalResponse, error) { +func (ss *SignerServer) SignProposal(ctx context.Context, req *privvalproto.SignProposalRequest) (*privvalproto.SignedProposalResponse, error) { proposal := req.Proposal err := ss.privVal.SignProposal(ctx, req.ChainId, proposal) diff --git a/privval/grpc/server_test.go b/privval/grpc/server_test.go index db85a42b7..ad63b87a6 100644 --- a/privval/grpc/server_test.go +++ b/privval/grpc/server_test.go @@ -37,7 +37,7 @@ func TestGetPubKey(t *testing.T) { defer cancel() logger := log.NewTestingLogger(t) - s := tmgrpc.NewSignerServer(ChainID, tc.pv, logger) + s := tmgrpc.NewSignerServer(logger, ChainID, tc.pv) req := &privvalproto.PubKeyRequest{ChainId: ChainID} resp, err := s.GetPubKey(ctx, req) @@ -112,7 +112,7 @@ func TestSignVote(t *testing.T) { defer cancel() logger := log.NewTestingLogger(t) - s := tmgrpc.NewSignerServer(ChainID, tc.pv, logger) + s := tmgrpc.NewSignerServer(logger, ChainID, tc.pv) req := &privvalproto.SignVoteRequest{ChainId: ChainID, Vote: tc.have.ToProto()} resp, err := s.SignVote(ctx, req) @@ -183,7 +183,7 @@ func TestSignProposal(t *testing.T) { defer cancel() logger := log.NewTestingLogger(t) - s := tmgrpc.NewSignerServer(ChainID, tc.pv, logger) + s := tmgrpc.NewSignerServer(logger, ChainID, tc.pv) req := &privvalproto.SignProposalRequest{ChainId: ChainID, Proposal: tc.have.ToProto()} resp, err := s.SignProposal(ctx, req) diff --git a/spec/consensus/proposer-based-timestamp/pbts-sysmodel_002_draft.md b/spec/consensus/proposer-based-timestamp/pbts-sysmodel_002_draft.md index 832d11c9a..d6fcb54b6 100644 --- a/spec/consensus/proposer-based-timestamp/pbts-sysmodel_002_draft.md +++ b/spec/consensus/proposer-based-timestamp/pbts-sysmodel_002_draft.md @@ -1,16 +1,31 @@ # PBTS: System Model and Properties +## Outline + + - [System model](#system-model) + - [Synchronized clocks](#synchronized-clocks) + - [Message delays](#message-delays) + - [Problem Statement](#problem-statement) + - [Protocol Analysis - Timely Proposals](#protocol-analysis---timely-proposals) + - [Timely Proof-of-Locks](#timely-proof-of-locks) + - [Derived Proof-of-Locks](#derived-proof-of-locks) + - [Temporal Analysis](#temporal-analysis) + - [Safety](#safety) + - [Liveness](#liveness) + ## System Model #### **[PBTS-CLOCK-NEWTON.0]** -There is a reference Newtonian real-time `t` (UTC). +There is a reference Newtonian real-time `t`. No process has direct access to this reference time, used only for specification purposes. +The reference real-time is assumed to be aligned with the Coordinated Universal Time (UTC). ### Synchronized clocks -Processes are assumed to be equipped with synchronized clocks. +Processes are assumed to be equipped with synchronized clocks, +aligned with the Coordinated Universal Time (UTC). This requires processes to periodically synchronize their local clocks with an external and trusted source of the time (e.g. NTP servers). @@ -27,43 +42,35 @@ and drifts of local clocks from real time. #### **[PBTS-CLOCK-PRECISION.0]** There exists a system parameter `PRECISION`, such that -for any two processes `p` and `q`, with local clocks `C_p` and `C_q`, -that read their local clocks at the same real-time `t`, we have: +for any two processes `p` and `q`, with local clocks `C_p` and `C_q`: -- If `p` and `q` are equipped with synchronized clocks, then `|C_p(t) - C_q(t)| < PRECISION` +- If `p` and `q` are equipped with synchronized clocks, + then for any real-time `t` we have `|C_p(t) - C_q(t)| <= PRECISION`. `PRECISION` thus bounds the difference on the times simultaneously read by processes from their local clocks, so that their clocks can be considered synchronized. #### Accuracy -The [first draft][sysmodel_v1] of this specification included a second clock-related parameter, `ACCURACY`, -that relates the values read by processes from their synchronized clocks with real time: +A second relevant clock parameter is accuracy, which binds the values read by +processes from their clocks to real time. -- If `p` is a process is equipped with a synchronized clock, then at real time - `t` it reads from its clock time `C_p(t)` with `|C_p(t) - t| < ACCURACY` +##### **[PBTS-CLOCK-ACCURACY.0]** -The adoption of `ACCURACY` as the upper bound on the difference between clock -readings and real time, however, renders the `PRECISION` parameter redundant. -In fact, if we assume that clocks readings are at most `ACCURACY` from real -time, we would therefore be assuming that they cannot be more than `2 * ACCURACY` -apart from each other, thus establishing a worst-case upper bound for `PRECISION`. - -The approach we take is to assume that processes clocks are periodically -synchronized with an external source of time, thus improving their accuracy. -This allows us to adopt a relaxed version of the above `ACCURACY` definition: - -##### **[PBTS-CLOCK-FAIR.0]** +For the sake of completeness, we define a parameter `ACCURACY` such that: - At real time `t` there is at least one correct process `p` which clock marks - `C_p(t)` with `|C_p(t) - t| < ACCURACY` + `C_p(t)` with `|C_p(t) - t| <= ACCURACY`. -Then, through [PBTS-CLOCK-PRECISION] we can extend this relation of clock times -with real time to every correct process, which will have a clock with accuracy -bound by `ACCURACY + PRECISION`. -But, for the sake of simpler specification we can assume that the `PRECISION`, -which is a worst-case parameter that applies to all correct processes, -includes the best `ACCURACY` achieved by any of them. +As a consequence, applying the definition of `PRECISION`, we have: + +- At real time `t` the synchronized clock of any correct process `p` marks + `C_p(t)` with `|C_p(t) - t| <= ACCURACY + PRECISION`. + +The reason for not adopting `ACCURACY` as a system parameter is the assumption +that `PRECISION >> ACCURACY`. +This allows us to consider, for practical purposes, that the `PRECISION` system +parameter embodies the `ACCURACY` model parameter. ### Message Delays @@ -79,172 +86,264 @@ defining a lower bound, a *minimum time* that a correct process assigns to propo While *minimum delay* for delivering a proposal to a destination allows defining an upper bound, the *maximum time* assigned to a proposal. -#### **[PBTS-MSG-D.0]** +#### **[PBTS-MSG-DELAY.0]** -There exists a system parameter `MSGDELAY` for end-to-end delays of messages carrying proposals, -such for any two correct processes `p` and `q`, and any real time `t`: +There exists a system parameter `MSGDELAY` for end-to-end delays of proposal messages, +such for any two correct processes `p` and `q`: -- If `p` sends a message `m` carrying a proposal at time `ts`, -then if `q` receives the message and learns the proposal, -`q` does that at time `t` such that `ts <= t <= ts + MSGDELAY`. +- If `p` sends a proposal message `m` at real time `t` and `q` receives `m` at + real time `t'`, then `t <= t' <= t + MSGDELAY`. -While we don't want to impose particular restrictions regarding the format of `m`, -we need to assume that their size is upper bounded. -In practice, using messages with a fixed-size to carry proposals allows -for a more accurate estimation of `MSGDELAY`, and therefore is advised. +Notice that, as a system parameter, `MSGDELAY` should be observed for any +proposal message broadcast by correct processes: it is a *worst-case* parameter. +As message delays depends on the message size, the above requirement implicitly +indicates that the size of proposal messages is either fixed or upper bounded. ## Problem Statement In this section we define the properties of Tendermint consensus -(cf. the [arXiv paper][arXiv]) in this new system model. +(cf. the [arXiv paper][arXiv]) in this system model. -#### **[PBTS-PROPOSE.0]** +### **[PBTS-PROPOSE.0]** -A proposer proposes a consensus value `v` with an associated proposal time `v.time`. +A proposer proposes a consensus value `v` that includes a proposal time +`v.time`. + +> We then restrict the allowed decisions along the following lines: #### **[PBTS-INV-AGREEMENT.0]** -[Agreement] No two correct processes decide on different values `v`. (This implies that no two correct processes decide on different proposal times `v.time`.) +- [Agreement] No two correct processes decide on different values `v`. + +This implies that no two correct processes decide on different proposal times +`v.time`. #### **[PBTS-INV-VALID.0]** -[Validity] If a correct process decides on value `v`, -then `v` satisfies a predefined `valid` predicate. +- [Validity] If a correct process decides on value `v`, then `v` satisfies a + predefined `valid` predicate. + +With respect to PBTS, the `valid` predicate requires proposal times to be +[monotonic](./pbts-algorithm_002_draft.md#time-monotonicity) over heights of +consensus: + +##### **[PBTS-INV-MONOTONICITY.0]** + +- If a correct process decides on value `v` at the height `h` of consensus, + thus setting `decision[h] = v`, then `v.time > decision[h'].time` for all + previous heights `h' < h`. + +The monotonicity of proposal times, and external validity in general, +implicitly assumes that heights of consensus are executed in order. #### **[PBTS-INV-TIMELY.0]** -[Time-Validity] If a correct process decides on value `v`, -then the associated proposal time `v.time` satisfies a predefined `timely` predicate. +- [Time-Validity] If a correct process decides on value `v`, then the proposal + time `v.time` was considered `timely` by at least one correct process. -> Both [Validity] and [Time-Validity] must be observed even if up to `2f` validators are faulty. +PBTS introduces a `timely` predicate that restricts the allowed decisions based +on the proposal time `v.time` associated with a proposed value `v`. +As a synchronous predicate, the time at which it is evaluated impacts on +whether a process accepts or reject a proposal time. +For this reason, the Time-Validity property refers to the previous evaluation +of the `timely` predicate, detailed in the following section. -### Timely proposals +## Protocol Analysis - Timely proposals + +For PBTS, a `proposal` is a tuple `(v, v.time, v.round)`, where: + +- `v` is the proposed value; +- `v.time` is the associated proposal time; +- `v.round` is the round at which `v` was first proposed. + +We include the proposal round `v.round` in the proposal definition because a +value `v` and its associated proposal time `v.time` can be proposed in multiple +rounds, but the evaluation of the `timely` predicate is only relevant at round +`v.round`. + +> Considering the algorithm in the [arXiv paper][arXiv], a new proposal is +> produced by the `getValue()` method, invoked by the proposer `p` of round +> `round_p` when starting its proposing round with a nil `validValue_p`. +> The first round at which a value `v` is proposed is then the round at which +> the proposal for `v` was produced, and broadcast in a `PROPOSAL` message with +> `vr = -1`. + +#### **[PBTS-PROPOSAL-RECEPTION.0]** The `timely` predicate is evaluated when a process receives a proposal. -Let `now_p` be time a process `p` reads from its local clock when `p` receives a proposal. -Let `v` be the proposed value and `v.time` the proposal time. -The proposal is considered `timely` by `p` if: +More precisely, let `p` be a correct process: -#### **[PBTS-RECEPTION-STEP.1]** +- `proposalReceptionTime(p,r)` is the time `p` reads from its local clock when + `p` is at round `r` and receives the proposal of round `r`. -1. `now_p >= v.time - PRECISION` and -1. `now_p <= v.time + MSGDELAY + PRECISION` +#### **[PBTS-TIMELY.0]** + +The proposal `(v, v.time, v.round)` is considered `timely` by a correct process +`p` if: + +1. `proposalReceptionTime(p,v.round)` is set, and +1. `proposalReceptionTime(p,v.round) >= v.time - PRECISION`, and +1. `proposalReceptionTime(p,v.round) <= v.time + MSGDELAY + PRECISION`. + +A correct process at round `v.round` only sends a `PREVOTE` for `v` if the +associated proposal time `v.time` is considered `timely`. + +> Considering the algorithm in the [arXiv paper][arXiv], the `timely` predicate +> is evaluated by a process `p` when it receives a valid `PROPOSAL` message +> from the proposer of the current round `round_p` with `vr = -1`. ### Timely Proof-of-Locks -We denote by `POL(v,r)` a *Proof-of-Lock* of value `v` at the round `r` of consensus. -`POL(v,r)` consists of a set of `PREVOTE` messages of round `r` for the value `v` -from processes whose cumulative voting power is at least `2f + 1`. +A *Proof-of-Lock* is a set of `PREVOTE` messages of round of consensus for the +same value from processes whose cumulative voting power is at least `2f + 1`. +We denote as `POL(v,r)` a proof-of-lock of value `v` at round `r`. -#### **[PBTS-TIMELY-POL.1]** +For PBTS, we are particularly interested in the `POL(v,v.round)` produced in +the round `v.round` at which a value `v` was first proposed. +We call it a *timely* proof-of-lock for `v` because it can only be observed +if at least one correct process considered it `timely`: + +#### **[PBTS-TIMELY-POL.0]** If -- there is a valid `POL(v,r*)` for height `h`, and -- `r*` is the lowest-numbered round `r` of height `h` for which there is a valid `POL(v,r)`, and -- `POL(v,r*)` contains a `PREVOTE` message from at least one correct process, +- there is a valid `POL(v,r)` with `r = v.round`, and +- `POL(v,v.round)` contains a `PREVOTE` message from at least one correct process, -Then, where `p` is a such correct process: +Then, let `p` is a such correct process: -- `p` received a `PROPOSE` message of round `r*` and height `h`, and -- the `PROPOSE` message contained a proposal for value `v` with proposal time `v.time`, and -- a correct process `p` considered the proposal `timely`. +- `p` received a `PROPOSAL` message of round `v.round`, and +- the `PROPOSAL` message contained a proposal `(v, v.time, v.round)`, and +- `p` was in round `v.round` and evaluated the proposal time `v.time` as `timely`. -The round `r*` above defined will be, in most cases, -the round in which `v` was originally proposed, and when `v.time` was assigned, -using a `PROPOSE` message with `POLRound = -1`. -In any case, at least one correct process must consider the proposal `timely` at round `r*` -to enable a valid `POL(v,r*)` to be observed. +The existence of a such correct process `p` is guaranteed provided that the +voting power of Byzantine processes is bounded by `2f`. ### Derived Proof-of-Locks -#### **[PBTS-DERIVED-POL.1]** +The existence of `POL(v,r)` is a requirement for the decision of `v` at round +`r` of consensus. + +At the same time, the Time-Validity property establishes that if `v` is decided +then a timely proof-of-lock `POL(v,v.round)` must have been produced. + +So, we need to demonstrate here that any valid `POL(v,r)` is either a timely +proof-of-lock or it is derived from a timely proof-of-lock: + +#### **[PBTS-DERIVED-POL.0]** If -- there is a valid `POL(v,r)` for height `h`, and +- there is a valid `POL(v,r)`, and - `POL(v,r)` contains a `PREVOTE` message from at least one correct process, Then -- there is a valid `POL(v,r*)` for height `h`, with `r* <= r`, and -- `POL(v,r*)` contains a `PREVOTE` message from at least one correct process, and -- a correct process considered the proposal for `v` `timely` at round `r*`. +- there is a valid `POL(v,v.round)` with `v.round <= r` which is a timely proof-of-lock. -The above relation derives from a recursion on the round number `r`. -It is trivially observed when `r = r*`, the base of the recursion, -when a timely `POL(v,r*)` is obtained. -We need to ensure that, once a timely `POL(v,r*)` is obtained, -it is possible to obtain a valid `POL(v,r)` with `r > r*`, -without the need of satisfying the `timely` predicate (again) in round `r`. -In fact, since rounds are started in order, it is not likely that -a proposal time `v.time`, assigned at round `r*`, -will still be considered `timely` when the round `r > r*` is in progress. +The above relation is trivially observed when `r = v.round`, as `POL(v,r)` must +be a timely proof-of-lock. +Notice that we cannot have `r < v.round`, as `v.round` is defined as the first +round at which `v` was proposed. -In other words, the algorithm should ensure that once a `POL(v,r*)` attests -that the proposal for `v` is `timely`, -further valid `POL(v,r)` with `r > r*` can be obtained, -even though processes do not consider the proposal for `v` `timely` any longer. +For `r > v.round` we need to demonstrate that if there is a valid `POL(v,r)`, +then a timely `POL(v,v.round)` was previously obtained. +We observe that a condition for observing a `POL(v,r)` is that the proposer of +round `r` has broadcast a `PROPOSAL` message for `v`. +As `r > v.round`, we can affirm that `v` was not produced in round `r`. +Instead, by the protocol operation, `v` was a *valid value* for the proposer of +round `r`, which means that if the proposer has observed a `POL(v,vr)` with `vr +< r`. +The above operation considers a *correct* proposer, but since a `POL(v,r)` was +produced (by hypothesis) we can affirm that at least one correct process (also) +observed a `POL(v,vr)`. -> This can be achieved if the proposer of round `r' > r*` proposes `v` in a `PROPOSE` message -with `POLRound = r*`, and at least one correct processes is aware of a `POL(v,r*)`. -> From this point, if a valid `POL(v,r')` is achieved, it can replace the adopted `POL(v,r*)`. +> Considering the algorithm in the [arXiv paper][arXiv], `v` was proposed by +> the proposer `p` of round `round_p` because its `validValue_p` variable was +> set to `v`. +> The `PROPOSAL` message broadcast by the proposer, in this case, had `vr > -1`, +> and it could only be accepted by processes that also observed a `POL(v,vr)`. -### SAFETY +Thus, if there is a `POL(v,r)` with `r > v.round`, then there is a valid +`POL(v,vr)` with `v.round <= vr < r`. +If `vr = v.round` then `POL(vr,v)` is a timely proof-of-lock and we are done. +Otherwise, there is another valid `POL(v,vr')` with `v.round <= vr' < vr`, +and the above reasoning can be recursively applied until we get `vr' = v.round` +and observe a timely proof-of-lock. -The safety of the algorithm requires a *timely* proof-of-lock for a decided value, -either directly evaluated by a correct process, -or indirectly received through a derived proof-of-lock. +## Temporal analysis -#### **[PBTS-CONSENSUS-TIME-VALID.0]** +In this section we present invariants that need be observed for ensuring that +PBTS is both safe and live. + +In addition to the variables and system parameters already defined, we use +`beginRound(p,r)` as the value of process `p`'s local clock +when it starts round `r` of consensus. + +### Safety + +The safety of PBTS requires that if a value `v` is decided, then at least one +correct process `p` considered the associated proposal time `v.time` timely. +Following the definition of [timely proposals](#pbts-timely0) and +proof-of-locks, we require this condition to be asserted at a specific round of +consensus, defined as `v.round`: + +#### **[PBTS-SAFETY.0]** If -- there is a valid commit `C` for height `k` and round `r`, and +- there is a valid commit `C` for a value `v` - `C` contains a `PRECOMMIT` message from at least one correct process -Then, where `p` is one such correct process: +then there is a correct process `p` (not necessarily the same above considered) such that: -- since `p` is correct, `p` received a valid `POL(v,r)`, and -- `POL(v,r)` contains a `PREVOTE` message from at least one correct process, and -- `POL(v,r)` is derived from a timely `POL(v,r*)` with `r* <= r`, and -- `POL(v,r*)` contains a `PREVOTE` message from at least one correct process, and -- a correct process considered a proposal for `v` `timely` at round `r*`. +- `beginRound(p,v.round) <= proposalReceptionTime(p,v.round) <= beginRound(p,v.round+1)` and +- `proposalReceptionTime (p,v.round) - MSGDELAY - PRECISION <= v.time <= proposalReceptionTime(p,v.round) + PRECISION` -### LIVENESS +That is, a correct process `p` started round `v.round` and, while still at +round `v.round`, received a `PROPOSAL` message from round `v.round` proposing +`v`. +Moreover, the reception time of the original proposal for `v`, according with +`p`'s local clock, enabled `p` to consider the proposal time `v.time` as +`timely`. +This is the requirement established by PBTS for issuing a `PREVOTE` for the +proposal `(v, v.time, v.round)`, so for the eventual decision of `v`. -In terms of liveness, we need to ensure that a proposal broadcast by a correct process -will be considered `timely` by any correct process that is ready to accept that proposal. -So, if: +### Liveness -- the proposer `p` of a round `r` is correct, -- there is no `POL(v',r')` for any value `v'` and any round `r' < r`, -- `p` proposes a valid value `v` and sets `v.time` to the time it reads from its local clock, +The liveness of PBTS relies on correct processes accepting proposal times +assigned by correct proposers. +We thus present a set of conditions for assigning a proposal time `v.time` so +that every correct process should be able to issue a `PREVOTE` for `v`. -Then let `q` be a correct process that receives `p`'s proposal, we have: +#### **[PBTS-LIVENESS.0]** -- `q` receives `p`'s proposal after its clock reads `v.time - PRECISION`, and -- if `q` is at or joins round `r` while `p`'s proposal is being transmitted, -then `q` receives `p`'s proposal before its clock reads `v.time + MSGDELAY + PRECISION` +If -> Note that, before `GST`, we cannot ensure that every correct process receives `p`'s proposals, nor that it does it while ready to accept a round `r` proposal. +- the proposer of a round `r` of consensus is correct +- and it proposes a value `v` for the first time, with associated proposal time `v.time` -A correct process `q` as above defined must then consider `p`'s proposal `timely`. -It will then broadcast a `PREVOTE` message for `v` at round `r`, -thus enabling, from the Time-Validity point of view, `v` to be eventually decided. +then the proposal `(v, v.time, r)` is accepted by every correct process provided that: -#### Under-estimated `MSGDELAY`s +- `min{p is correct : beginRound(p,r)} <= v.time <= max{p is correct : beginRound(p,r)}` and +- `max{p is correct : beginRound(p,r)} <= v.time + MSGDELAY + PRECISION <= min{p is correct : beginRound(p,r+1)}` -The liveness assumptions of PBTS are conditioned by a conservative and clever -choice of the timing parameters, specially of `MSGDELAY`. -In fact, if the transmission delay for a message carrying a proposal is wrongly -estimated, correct processes may never consider a valid proposal as `timely`. +The first condition establishes a range of safe proposal times `v.time` for round `r`. +This condition is trivially observed if a correct proposer `p` sets `v.time` to the time it +reads from its clock when starting round `r` and proposing `v`. +A `PROPOSAL` message sent by `p` at local time `v.time` should not be received +by any correct process before its local clock reads `v.time - PRECISION`, so +that condition 2 of [PBTS-TIMELY.0] is observed. -To circumvent this liveness issue, which could result from a misconfiguration, -we assume that the `MSGDELAY` parameter can be increased as rounds do not -succeed on deciding a value, possibly because no proposal is considered -`timely` by enough processes. -The precise behavior for this workaround is under [discussion](https://github.com/tendermint/spec/issues/371). +The second condition establishes that every correct process should start round +`v.round` at a local time that allows `v.time` to still be considered timely, +according to condition 3. of [PBTS-TIMELY.0]. +In addition, it requires correct processes to stay long enough in round +`v.round` so that they can receive the `PROPOSAL` message of round `v.round`. +It assumed here that the proposer of `v` broadcasts a `PROPOSAL` message at +time `v.time`, according to its local clock, so that every correct process +should receive this message by time `v.time + MSGDELAY + PRECISION`, according +to their local clocks. Back to [main document][main]. diff --git a/test/e2e/node/main.go b/test/e2e/node/main.go index 54b9ef533..2cbb9e4b0 100644 --- a/test/e2e/node/main.go +++ b/test/e2e/node/main.go @@ -248,7 +248,7 @@ func startSigner(ctx context.Context, logger log.Logger, cfg *Config) error { if err != nil { return err } - ss := grpcprivval.NewSignerServer(cfg.ChainID, filePV, logger) + ss := grpcprivval.NewSignerServer(logger, cfg.ChainID, filePV) s := grpc.NewServer()