Merge branch 'cal/vote-extensions-1' into cal/vote-extensions-2

This commit is contained in:
Callum Waters
2022-11-17 18:11:23 +01:00
321 changed files with 11626 additions and 7626 deletions

30
blocksync/metrics.gen.go Normal file
View File

@@ -0,0 +1,30 @@
// Code generated by metricsgen. DO NOT EDIT.
package blocksync
import (
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
Syncing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "syncing",
Help: "Whether or not a node is block syncing. 1 if yes, 0 if no.",
}, labels).With(labelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
Syncing: discard.NewGauge(),
}
}

19
blocksync/metrics.go Normal file
View File

@@ -0,0 +1,19 @@
package blocksync
import (
"github.com/go-kit/kit/metrics"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "blocksync"
)
//go:generate go run ../scripts/metricsgen -struct=Metrics
// Metrics contains metrics exposed by this package.
type Metrics struct {
// Whether or not a node is block syncing. 1 if yes, 0 if no.
Syncing metrics.Gauge
}

View File

@@ -19,58 +19,6 @@ const (
BlockResponseMessageFieldKeySize
)
// EncodeMsg encodes a Protobuf message
func EncodeMsg(pb proto.Message) ([]byte, error) {
msg := bcproto.Message{}
switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}
bz, err := proto.Marshal(&msg)
if err != nil {
return nil, fmt.Errorf("unable to marshal %T: %w", pb, err)
}
return bz, nil
}
// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}
err := proto.Unmarshal(bz, pb)
if err != nil {
return nil, err
}
switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil
case *bcproto.Message_BlockResponse:
return msg.BlockResponse, nil
case *bcproto.Message_NoBlockResponse:
return msg.NoBlockResponse, nil
case *bcproto.Message_StatusRequest:
return msg.StatusRequest, nil
case *bcproto.Message_StatusResponse:
return msg.StatusResponse, nil
default:
return nil, fmt.Errorf("unknown message type %T", msg)
}
}
// ValidateMsg validates a message.
func ValidateMsg(pb proto.Message) error {
if pb == nil {

View File

@@ -80,7 +80,7 @@ func TestBcStatusResponseMessageValidateBasic(t *testing.T) {
}
//nolint:lll // ignore line length in tests
func TestBlockchainMessageVectors(t *testing.T) {
func TestBlocksyncMessageVectors(t *testing.T) {
block := types.MakeBlock(int64(3), []types.Tx{types.Tx("Hello World")}, nil, nil)
block.Version.Block = 11 // overwrite updated protocol version

View File

@@ -32,6 +32,7 @@ const (
maxTotalRequesters = 600
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
requestRetrySeconds = 30
// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we
@@ -630,7 +631,7 @@ OUTER_LOOP:
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
// log.Info("No peers available", "height", height)
bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
@@ -640,6 +641,7 @@ OUTER_LOOP:
bpr.peerID = peer.id
bpr.mtx.Unlock()
to := time.NewTimer(requestRetrySeconds * time.Second)
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
WAIT_LOOP:
@@ -652,6 +654,11 @@ OUTER_LOOP:
return
case <-bpr.Quit():
return
case <-to.C:
bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID)
// Simulate a redo
bpr.reset()
continue OUTER_LOOP
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()

View File

@@ -30,7 +30,7 @@ const (
)
type consensusReactor interface {
// for when we switch from blockchain reactor and block sync to
// for when we switch from blocksync reactor and block sync to
// the consensus machine
SwitchToConsensus(state sm.State, skipWAL bool)
}
@@ -58,11 +58,13 @@ type Reactor struct {
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
metrics *Metrics
}
// NewReactor returns new reactor instance.
func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore,
blockSync bool) *Reactor {
blockSync bool, metrics *Metrics) *Reactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
@@ -88,6 +90,7 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
blockSync: blockSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
metrics: metrics,
}
bcR.BaseReactor = *p2p.NewBaseReactor("Reactor", bcR)
return bcR
@@ -143,21 +146,20 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: MaxMsgSize,
MessageType: &bcproto.Message{},
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *Reactor) AddPeer(peer p2p.Peer) {
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height()})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
peer.Send(BlocksyncChannel, msgBytes)
peer.Send(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Base: bcR.store.Base(),
Height: bcR.store.Height(),
},
})
// it's OK if send fails. will try later in poolRoutine
// peer is added to the pool once we receive the first
@@ -185,53 +187,38 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return fmt.Errorf("failed to convert block to protobuf: %w", err)
}
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{
Block: bl,
ExtCommit: extCommit.ToProto(),
})
if err != nil {
return fmt.Errorf("could not marshal msg: %w", err)
}
if !src.TrySend(BlocksyncChannel, msgBytes) {
if !src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockResponse{Block: bl},
}) {
return fmt.Errorf("unable to queue blocksync message at height %d to peer %v", msg.Height, src)
}
return nil
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
return fmt.Errorf("could not convert msg to protobuf: %w", err)
}
if !src.TrySend(BlocksyncChannel, msgBytes) {
if !src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
}) {
return fmt.Errorf("unable to queue blocksync message at height %d to peer %v", msg.Height, src)
}
return nil
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
bcR.Switch.StopPeerForError(src, err)
func (bcR *Reactor) Receive(e p2p.Envelope) {
if err := ValidateMsg(e.Message); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", e.Message, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}
if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", e.Message)
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
switch msg := msg.(type) {
switch msg := e.Message.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
bcR.respondToPeer(msg, e.Src)
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)
if err != nil {
@@ -241,30 +228,28 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
extCommit, err := types.ExtendedCommitFromProto(msg.ExtCommit)
if err != nil {
bcR.Logger.Error("failed to convert extended commit from proto",
"peer", src,
"peer", e.Src,
"err", err)
return
}
if err := bcR.pool.AddBlock(src.ID(), block, extCommit, block.Size()); err != nil {
if err := bcR.pool.AddBlock(e.Src.ID(), block, extCommit, block.Size()); err != nil {
bcR.Logger.Error("failed to add block", "err", err)
}
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
e.Src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
},
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return
}
src.TrySend(BlocksyncChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -273,6 +258,8 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {
bcR.metrics.Syncing.Set(1)
defer bcR.metrics.Syncing.Set(0)
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
defer trySyncTicker.Stop()
@@ -305,13 +292,10 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
continue
}
queued := peer.TrySend(BlocksyncChannel, msgBytes)
queued := peer.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.BlockRequest{Height: request.Height},
})
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
@@ -323,7 +307,7 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest() //nolint: errcheck
go bcR.BroadcastStatusRequest()
}
}
@@ -443,7 +427,7 @@ FOR_LOOP:
// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
@@ -466,14 +450,9 @@ FOR_LOOP:
}
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
bm, err := EncodeMsg(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
}
bcR.Switch.Broadcast(BlocksyncChannel, bm)
return nil
func (bcR *Reactor) BroadcastStatusRequest() {
bcR.Switch.Broadcast(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: &bcproto.StatusRequest{},
})
}

View File

@@ -82,7 +82,7 @@ func newReactor(
// pool.height is determined from the store.
fastSync := true
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mp, sm.EmptyEvidencePool{})
mp, sm.EmptyEvidencePool{}, blockStore)
if err = stateStore.Save(state); err != nil {
panic(err)
}
@@ -117,7 +117,7 @@ func newReactor(
}
blockStore.SaveBlockWithExtendedCommit(thisBlock, thisParts, seenExtCommit)
state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock)
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(fmt.Errorf("error apply block: %w", err))
}
@@ -127,14 +127,14 @@ func newReactor(
}
}
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
bcReactor := NewReactor(state.Copy(), blockExec, blockStore, fastSync, NopMetrics())
bcReactor.SetLogger(logger.With("module", "blocksync"))
return ReactorPair{bcReactor, proxyApp}
}
func TestNoBlockResponse(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
config = test.ResetTestRoot("blocksync_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(t)
@@ -146,7 +146,7 @@ func TestNoBlockResponse(t *testing.T) {
reactorPairs[1] = newReactor(t, log.TestingLogger(), genDoc, privVals[0], 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
s.AddReactor("BLOCKSYNC", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
@@ -196,7 +196,7 @@ func TestNoBlockResponse(t *testing.T) {
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
config = test.ResetTestRoot("blocksync_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(t)
@@ -221,7 +221,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
reactorPairs[3] = newReactor(t, log.TestingLogger(), genDoc, privVals[0], 0)
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
s.AddReactor("BLOCKSYNC", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
@@ -260,7 +260,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
s.AddReactor("BLOCKSYNC", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)