Compare commits

..

3 Commits

Author SHA1 Message Date
William Banfield
1b595a0689 continue implementing predicates 2022-08-18 12:58:04 -04:00
William Banfield
555f6d38e8 implement line 22 2022-02-18 19:16:03 -05:00
William Banfield
2a24bf1564 stepper: initial passing test 2022-02-18 17:36:18 -05:00
4 changed files with 210 additions and 14 deletions

View File

@@ -207,7 +207,7 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
//----------------------------------------
func (cli *socketClient) Flush(ctx context.Context) error {
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush())
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush(), true)
if err != nil {
return queueErr(err)
}
@@ -389,22 +389,29 @@ func (cli *socketClient) FinalizeBlock(
//----------------------------------------
// queueRequest enqueues req onto the queue. The request can break early if the
// the context is canceled. If the queue is full, this method blocks to allow
// the request to be placed onto the queue. This has the effect of creating an
// unbounded queue of goroutines waiting to write to this queue which is a bit
// antithetical to the purposes of a queue, however, undoing this behavior has
// dangerous upstream implications as a result of the usage of this behavior upstream.
// Remove at your peril.
// queueRequest enqueues req onto the queue. If the queue is full, it ether
// returns an error (sync=false) or blocks (sync=true).
//
// When sync=true, ctx can be used to break early. When sync=false, ctx will be
// used later to determine if request should be dropped (if ctx.Err is
// non-nil).
//
// The caller is responsible for checking cli.Error.
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) {
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request, sync bool) (*ReqRes, error) {
reqres := NewReqRes(req)
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
if sync {
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
}
} else {
select {
case cli.reqQueue <- reqres:
default:
return nil, errors.New("buffer is full")
}
}
return reqres, nil
@@ -415,7 +422,7 @@ func (cli *socketClient) queueRequestAndFlush(
req *types.Request,
) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req)
reqres, err := cli.queueRequest(ctx, req, true)
if err != nil {
return nil, queueErr(err)
}

View File

@@ -0,0 +1,53 @@
package stepper
import (
"context"
"errors"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/consensus/types"
)
var (
ErrNoValidTransition = errors.New("no valid transition")
)
var emptyTransition = func(types.RoundState) (types.RoundState, consensus.Message) {
return types.RoundState{}, &consensus.VoteMessage{}
}
type (
Transition func(types.RoundState) (types.RoundState, consensus.Message)
Predicate func(types.RoundState) bool
)
type Operation struct {
Name string
P Predicate
T Transition
}
type stepper struct {
ops []Operation
}
func New(ops []Operation) stepper {
return stepper{ops: ops}
}
func (s *stepper) Next(ctx context.Context, state types.RoundState) (types.RoundState, consensus.Message, error) {
if t, ok := s.pickTransition(state); ok {
s, msg := t(state)
return s, msg, nil
}
return types.RoundState{}, nil, ErrNoValidTransition
}
func (s *stepper) pickTransition(state types.RoundState) (Transition, bool) {
for _, op := range s.ops {
if op.P(state) {
return op.T, true
}
}
return emptyTransition, false
}

View File

@@ -0,0 +1,135 @@
package stepper_test
import (
"bytes"
"context"
"testing"
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/consensus/stepper"
"github.com/tendermint/tendermint/internal/consensus/types"
tmtypes "github.com/tendermint/tendermint/types"
)
func TestRunStep(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wasCalled := false
alwaysTrue := func(types.RoundState) bool {
return true
}
emptyTransition := func(types.RoundState) (types.RoundState, consensus.Message) {
wasCalled = true
return types.RoundState{}, &consensus.VoteMessage{}
}
ops := []stepper.Operation{
{
P: alwaysTrue,
T: emptyTransition,
},
}
s := stepper.New(ops)
_, _, err := s.Next(ctx, types.RoundState{})
if err != nil {
t.Fatalf("unexepected error from Next %v", err)
}
if !wasCalled {
t.Fatal("expected transition to be call")
}
}
var line22Predicate = func(s types.RoundState) bool {
return s.Step == types.RoundStepPropose &&
s.ProposalBlockParts.IsComplete() &&
s.Proposal.POLRound == -1 &&
s.Proposal.Round == s.Round &&
s.Proposal.Height == s.Height &&
bytes.Equal(s.Validators.GetProposer().Address, s.ProposalBlock.ProposerAddress)
}
var line22Transition = func(s types.RoundState) (types.RoundState, consensus.Message) {
msg := &consensus.VoteMessage{
Vote: &tmtypes.Vote{
Height: s.Height,
Round: s.Round,
BlockID: tmtypes.BlockID{},
},
}
if valid(s.ProposalBlock) && (s.LockedRound == -1 || s.LockedBlock != nil && s.LockedBlock.HashesTo(s.ProposalBlock.Hash())) {
msg = &consensus.VoteMessage{
Vote: &tmtypes.Vote{
Height: s.Height,
Round: s.Round,
BlockID: tmtypes.BlockID{Hash: s.ProposalBlockParts.Hash(), PartSetHeader: s.ProposalBlockParts.Header()},
},
}
}
s.Step = types.RoundStepPrevote
return s, msg
}
func valid(b *tmtypes.Block) bool {
return true
}
func TestLine22(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ops := []stepper.Operation{
{
P: line22Predicate,
T: line22Transition,
},
}
s := stepper.New(ops)
addr := []byte("proposer")
r := types.RoundState{
Round: 1,
Height: 1,
LockedRound: -1,
Step: types.RoundStepPropose,
Proposal: &tmtypes.Proposal{
POLRound: -1,
Round: 1,
Height: 1,
},
ProposalBlockParts: tmtypes.NewPartSetFromData([]byte("part set"), 5),
ProposalBlock: &tmtypes.Block{
Header: tmtypes.Header{
ProposerAddress: addr,
},
},
Validators: &tmtypes.ValidatorSet{
Validators: []*tmtypes.Validator{
{
Address: addr,
},
},
Proposer: &tmtypes.Validator{
Address: addr,
},
},
}
r, msg, err := s.Next(ctx, r)
if err != nil {
t.Fatalf("unexepected error from Next %v", err)
}
if msg == nil {
t.Fatalf("expected message to not be nil")
}
if vote, ok := msg.(*consensus.VoteMessage); ok {
if vote.Vote.BlockID.IsNil() {
t.Fatalf("expected vote to be for block")
}
} else {
t.Fatalf("expected message to not be vote")
}
if r.Step != types.RoundStepPrevote {
t.Fatalf("expected step to be %v but saw %v", types.RoundStepPrevote, r.Step)
}
}

View File

@@ -68,6 +68,7 @@ type RoundState struct {
Height int64 `json:"height,string"` // Height we are working on
Round int32 `json:"round"`
Step RoundStepType `json:"step"`
StepTime time.Time `json:"step_time"`
StartTime time.Time `json:"start_time"`
// Subjective time when +2/3 precommits for Block at Round were found