From dbe2146d0a01e5986a4fd27e8b2c7461eacaa883 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Fri, 14 Jan 2022 18:14:09 -0800 Subject: [PATCH 01/11] rpc: simplify the encoding of interface-typed arguments in JSON (#7600) Add package jsontypes that implements a subset of the custom libs/json package. Specifically it handles encoding and decoding of interface types wrapped in "tagged" JSON objects. It omits the deep reflection on arbitrary types, preserving only the handling of type tags wrapper encoding. - Register interface types (Evidence, PubKey, PrivKey) for tagged encoding. - Update the existing implementations to satisfy the type. - Register those types with the jsontypes registry. - Add string tags to 64-bit integer fields where needed. - Add marshalers to structs that export interface-typed fields. --- crypto/crypto.go | 7 ++ crypto/ed25519/ed25519.go | 10 ++ crypto/secp256k1/secp256k1.go | 10 ++ crypto/sr25519/encoding.go | 8 +- crypto/sr25519/privkey.go | 3 + crypto/sr25519/pubkey.go | 3 + internal/jsontypes/jsontypes.go | 109 ++++++++++++++++++++ internal/jsontypes/jsontypes_test.go | 83 +++++++++++++++ internal/p2p/conn/secret_connection_test.go | 1 + privval/file.go | 17 +++ rpc/client/http/http.go | 4 +- rpc/client/http/request.go | 20 ++++ types/block.go | 4 +- types/evidence.go | 29 ++++-- types/genesis.go | 16 ++- types/node_key.go | 21 +++- types/validator.go | 22 +++- types/vote.go | 2 +- version/version.go | 4 +- 19 files changed, 349 insertions(+), 24 deletions(-) create mode 100644 internal/jsontypes/jsontypes.go create mode 100644 internal/jsontypes/jsontypes_test.go diff --git a/crypto/crypto.go b/crypto/crypto.go index 8d44b82f5..4f0dc05e7 100644 --- a/crypto/crypto.go +++ b/crypto/crypto.go @@ -2,6 +2,7 @@ package crypto import ( "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/jsontypes" "github.com/tendermint/tendermint/libs/bytes" ) @@ -25,6 +26,9 @@ type PubKey interface { VerifySignature(msg []byte, sig []byte) bool Equals(PubKey) bool Type() string + + // Implementations must support tagged encoding in JSON. + jsontypes.Tagged } type PrivKey interface { @@ -33,6 +37,9 @@ type PrivKey interface { PubKey() PubKey Equals(PrivKey) bool Type() string + + // Implementations must support tagged encoding in JSON. + jsontypes.Tagged } type Symmetric interface { diff --git a/crypto/ed25519/ed25519.go b/crypto/ed25519/ed25519.go index 3ac7f6d07..8673ff4d5 100644 --- a/crypto/ed25519/ed25519.go +++ b/crypto/ed25519/ed25519.go @@ -12,6 +12,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/jsontypes" tmjson "github.com/tendermint/tendermint/libs/json" ) @@ -58,11 +59,17 @@ const ( func init() { tmjson.RegisterType(PubKey{}, PubKeyName) tmjson.RegisterType(PrivKey{}, PrivKeyName) + + jsontypes.MustRegister(PubKey{}) + jsontypes.MustRegister(PrivKey{}) } // PrivKey implements crypto.PrivKey. type PrivKey []byte +// TypeTag satisfies the jsontypes.Tagged interface. +func (PrivKey) TypeTag() string { return PrivKeyName } + // Bytes returns the privkey byte format. func (privKey PrivKey) Bytes() []byte { return []byte(privKey) @@ -151,6 +158,9 @@ var _ crypto.PubKey = PubKey{} // PubKeyEd25519 implements crypto.PubKey for the Ed25519 signature scheme. type PubKey []byte +// TypeTag satisfies the jsontypes.Tagged interface. +func (PubKey) TypeTag() string { return PubKeyName } + // Address is the SHA256-20 of the raw pubkey bytes. func (pubKey PubKey) Address() crypto.Address { if len(pubKey) != PubKeySize { diff --git a/crypto/secp256k1/secp256k1.go b/crypto/secp256k1/secp256k1.go index c2c0c6017..f92b29c1f 100644 --- a/crypto/secp256k1/secp256k1.go +++ b/crypto/secp256k1/secp256k1.go @@ -10,6 +10,7 @@ import ( secp256k1 "github.com/btcsuite/btcd/btcec" "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/internal/jsontypes" tmjson "github.com/tendermint/tendermint/libs/json" // necessary for Bitcoin address format @@ -28,6 +29,9 @@ const ( func init() { tmjson.RegisterType(PubKey{}, PubKeyName) tmjson.RegisterType(PrivKey{}, PrivKeyName) + + jsontypes.MustRegister(PubKey{}) + jsontypes.MustRegister(PrivKey{}) } var _ crypto.PrivKey = PrivKey{} @@ -35,6 +39,9 @@ var _ crypto.PrivKey = PrivKey{} // PrivKey implements PrivKey. type PrivKey []byte +// TypeTag satisfies the jsontypes.Tagged interface. +func (PrivKey) TypeTag() string { return PrivKeyName } + // Bytes marshalls the private key using amino encoding. func (privKey PrivKey) Bytes() []byte { return []byte(privKey) @@ -138,6 +145,9 @@ const PubKeySize = 33 // This prefix is followed with the x-coordinate. type PubKey []byte +// TypeTag satisfies the jsontypes.Tagged interface. +func (PubKey) TypeTag() string { return PubKeyName } + // Address returns a Bitcoin style addresses: RIPEMD160(SHA256(pubkey)) func (pubKey PubKey) Address() crypto.Address { if len(pubKey) != PubKeySize { diff --git a/crypto/sr25519/encoding.go b/crypto/sr25519/encoding.go index c0a8a7925..8827ee0b1 100644 --- a/crypto/sr25519/encoding.go +++ b/crypto/sr25519/encoding.go @@ -1,6 +1,9 @@ package sr25519 -import tmjson "github.com/tendermint/tendermint/libs/json" +import ( + "github.com/tendermint/tendermint/internal/jsontypes" + tmjson "github.com/tendermint/tendermint/libs/json" +) const ( PrivKeyName = "tendermint/PrivKeySr25519" @@ -10,4 +13,7 @@ const ( func init() { tmjson.RegisterType(PubKey{}, PubKeyName) tmjson.RegisterType(PrivKey{}, PrivKeyName) + + jsontypes.MustRegister(PubKey{}) + jsontypes.MustRegister(PrivKey{}) } diff --git a/crypto/sr25519/privkey.go b/crypto/sr25519/privkey.go index f628ca1aa..4e9cc995f 100644 --- a/crypto/sr25519/privkey.go +++ b/crypto/sr25519/privkey.go @@ -29,6 +29,9 @@ type PrivKey struct { kp *sr25519.KeyPair } +// TypeTag satisfies the jsontypes.Tagged interface. +func (PrivKey) TypeTag() string { return PrivKeyName } + // Bytes returns the byte-encoded PrivKey. func (privKey PrivKey) Bytes() []byte { if privKey.kp == nil { diff --git a/crypto/sr25519/pubkey.go b/crypto/sr25519/pubkey.go index 7e701dd1f..717f25c8c 100644 --- a/crypto/sr25519/pubkey.go +++ b/crypto/sr25519/pubkey.go @@ -23,6 +23,9 @@ const ( // PubKey implements crypto.PubKey. type PubKey []byte +// TypeTag satisfies the jsontypes.Tagged interface. +func (PubKey) TypeTag() string { return PubKeyName } + // Address is the SHA256-20 of the raw pubkey bytes. func (pubKey PubKey) Address() crypto.Address { if len(pubKey) != PubKeySize { diff --git a/internal/jsontypes/jsontypes.go b/internal/jsontypes/jsontypes.go new file mode 100644 index 000000000..a4c3c53ff --- /dev/null +++ b/internal/jsontypes/jsontypes.go @@ -0,0 +1,109 @@ +// Package jsontypes supports decoding for interface types whose concrete +// implementations need to be stored as JSON. To do this, concrete values are +// packaged in wrapper objects having the form: +// +// { +// "type": "", +// "value": +// } +// +// This package provides a registry for type tag strings and functions to +// encode and decode wrapper objects. +package jsontypes + +import ( + "bytes" + "encoding/json" + "fmt" + "reflect" +) + +// The Tagged interface must be implemented by a type in order to register it +// with the jsontypes package. The TypeTag method returns a string label that +// is used to distinguish objects of that type. +type Tagged interface { + TypeTag() string +} + +// registry records the mapping from type tags to value types. Values in this +// map must be normalized to non-pointer types. +var registry = struct { + types map[string]reflect.Type +}{types: make(map[string]reflect.Type)} + +// register adds v to the type registry. It reports an error if the tag +// returned by v is already registered. +func register(v Tagged) error { + tag := v.TypeTag() + if t, ok := registry.types[tag]; ok { + return fmt.Errorf("type tag %q already registered to %v", tag, t) + } + typ := reflect.TypeOf(v) + if typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + registry.types[tag] = typ + return nil +} + +// MustRegister adds v to the type registry. It will panic if the tag returned +// by v is already registered. This function is meant for use during program +// initialization. +func MustRegister(v Tagged) { + if err := register(v); err != nil { + panic(err) + } +} + +type wrapper struct { + Type string `json:"type"` + Value json.RawMessage `json:"value"` +} + +// Marshal marshals a JSON wrapper object containing v. If v == nil, Marshal +// returns the JSON "null" value without error. +func Marshal(v Tagged) ([]byte, error) { + if v == nil { + return []byte("null"), nil + } + data, err := json.Marshal(v) + if err != nil { + return nil, err + } + return json.Marshal(wrapper{ + Type: v.TypeTag(), + Value: data, + }) +} + +// Unmarshal unmarshals a JSON wrapper object into v. It reports an error if +// the data do not encode a valid wrapper object, if the wrapper's type tag is +// not registered with jsontypes, or if the resulting value is not compatible +// with the type of v. +func Unmarshal(data []byte, v interface{}) error { + // Verify that the target is some kind of pointer. + target := reflect.ValueOf(v) + if target.Kind() != reflect.Ptr { + return fmt.Errorf("target %T is not a pointer", v) + } + + var w wrapper + dec := json.NewDecoder(bytes.NewReader(data)) + dec.DisallowUnknownFields() + if err := dec.Decode(&w); err != nil { + return fmt.Errorf("invalid type wrapper: %w", err) + } + typ, ok := registry.types[w.Type] + if !ok { + return fmt.Errorf("unknown type tag: %q", w.Type) + } else if !typ.AssignableTo(target.Elem().Type()) { + return fmt.Errorf("type %v not assignable to %T", typ, v) + } + + obj := reflect.New(typ) + if err := json.Unmarshal(w.Value, obj.Interface()); err != nil { + return fmt.Errorf("decoding wrapped value: %w", err) + } + target.Elem().Set(obj.Elem()) + return nil +} diff --git a/internal/jsontypes/jsontypes_test.go b/internal/jsontypes/jsontypes_test.go new file mode 100644 index 000000000..b721e2b84 --- /dev/null +++ b/internal/jsontypes/jsontypes_test.go @@ -0,0 +1,83 @@ +package jsontypes_test + +import ( + "testing" + + "github.com/tendermint/tendermint/internal/jsontypes" +) + +type testType struct { + Field string `json:"field"` +} + +func (*testType) TypeTag() string { return "test/TaggedType" } + +func TestRoundTrip(t *testing.T) { + const wantEncoded = `{"type":"test/TaggedType","value":{"field":"hello"}}` + + t.Run("MustRegisterOK", func(t *testing.T) { + defer func() { + if x := recover(); x != nil { + t.Fatalf("Registration panicked: %v", x) + } + }() + jsontypes.MustRegister((*testType)(nil)) + }) + + t.Run("MustRegisterFail", func(t *testing.T) { + defer func() { + if x := recover(); x != nil { + t.Logf("Got expected panic: %v", x) + } + }() + jsontypes.MustRegister((*testType)(nil)) + t.Fatal("Registration should not have succeeded") + }) + + t.Run("MarshalNil", func(t *testing.T) { + bits, err := jsontypes.Marshal(nil) + if err != nil { + t.Fatalf("Marshal failed: %v", err) + } + if got := string(bits); got != "null" { + t.Errorf("Marshal nil: got %#q, want null", got) + } + }) + + t.Run("RoundTrip", func(t *testing.T) { + obj := testType{Field: "hello"} + bits, err := jsontypes.Marshal(&obj) + if err != nil { + t.Fatalf("Marshal %T failed: %v", obj, err) + } + if got := string(bits); got != wantEncoded { + t.Errorf("Marshal %T: got %#q, want %#q", obj, got, wantEncoded) + } + + var cmp testType + if err := jsontypes.Unmarshal(bits, &cmp); err != nil { + t.Errorf("Unmarshal %#q failed: %v", string(bits), err) + } + if obj != cmp { + t.Errorf("Unmarshal %#q: got %+v, want %+v", string(bits), cmp, obj) + } + }) + + t.Run("Unregistered", func(t *testing.T) { + obj := testType{Field: "hello"} + bits, err := jsontypes.Marshal(&obj) + if err != nil { + t.Fatalf("Marshal %T failed: %v", obj, err) + } + if got := string(bits); got != wantEncoded { + t.Errorf("Marshal %T: got %#q, want %#q", obj, got, wantEncoded) + } + + var cmp struct { + Field string `json:"field"` + } + if err := jsontypes.Unmarshal(bits, &cmp); err != nil { + t.Errorf("Unmarshal %#q: got %+v, want %+v", string(bits), cmp, obj) + } + }) +} diff --git a/internal/p2p/conn/secret_connection_test.go b/internal/p2p/conn/secret_connection_test.go index 7bc5e0b34..362c8102f 100644 --- a/internal/p2p/conn/secret_connection_test.go +++ b/internal/p2p/conn/secret_connection_test.go @@ -52,6 +52,7 @@ func (pk privKeyWithNilPubKey) Sign(msg []byte) ([]byte, error) { return pk.orig func (pk privKeyWithNilPubKey) PubKey() crypto.PubKey { return nil } func (pk privKeyWithNilPubKey) Equals(pk2 crypto.PrivKey) bool { return pk.orig.Equals(pk2) } func (pk privKeyWithNilPubKey) Type() string { return "privKeyWithNilPubKey" } +func (privKeyWithNilPubKey) TypeTag() string { return "test/privKeyWithNilPubKey" } func TestSecretConnectionHandshake(t *testing.T) { fooSecConn, barSecConn := makeSecretConnPair(t) diff --git a/privval/file.go b/privval/file.go index a075323a8..b11346dc7 100644 --- a/privval/file.go +++ b/privval/file.go @@ -14,6 +14,7 @@ import ( "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/secp256k1" + "github.com/tendermint/tendermint/internal/jsontypes" "github.com/tendermint/tendermint/internal/libs/protoio" "github.com/tendermint/tendermint/internal/libs/tempfile" tmbytes "github.com/tendermint/tendermint/libs/bytes" @@ -55,6 +56,22 @@ type FilePVKey struct { filePath string } +func (pvKey FilePVKey) MarshalJSON() ([]byte, error) { + pubk, err := jsontypes.Marshal(pvKey.PubKey) + if err != nil { + return nil, err + } + privk, err := jsontypes.Marshal(pvKey.PrivKey) + if err != nil { + return nil, err + } + return json.Marshal(struct { + Address types.Address `json:"address"` + PubKey json.RawMessage `json:"pub_key"` + PrivKey json.RawMessage `json:"priv_key"` + }{Address: pvKey.Address, PubKey: pubk, PrivKey: privk}) +} + // Save persists the FilePVKey to its filePath. func (pvKey FilePVKey) Save() error { outFile := pvKey.filePath diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index 7dd6abb76..f10985ae3 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -510,7 +510,9 @@ func (c *baseRPCClient) BroadcastEvidence( ev types.Evidence, ) (*coretypes.ResultBroadcastEvidence, error) { result := new(coretypes.ResultBroadcastEvidence) - if err := c.caller.Call(ctx, "broadcast_evidence", map[string]interface{}{"evidence": ev}, result); err != nil { + if err := c.caller.Call(ctx, "broadcast_evidence", evidenceArgs{ + Evidence: ev, + }, result); err != nil { return nil, err } return result, nil diff --git a/rpc/client/http/request.go b/rpc/client/http/request.go index 5d1d3db5b..a6f85b637 100644 --- a/rpc/client/http/request.go +++ b/rpc/client/http/request.go @@ -4,7 +4,11 @@ package http // from the client to the server. import ( + "encoding/json" + + "github.com/tendermint/tendermint/internal/jsontypes" "github.com/tendermint/tendermint/libs/bytes" + "github.com/tendermint/tendermint/types" ) type abciQueryArgs struct { @@ -57,3 +61,19 @@ type validatorArgs struct { Page *int `json:"page,string,omitempty"` PerPage *int `json:"per_page,string,omitempty"` } + +type evidenceArgs struct { + Evidence types.Evidence +} + +// MarshalJSON implements json.Marshaler to encode the evidence using the +// wrapped concrete type of the implementation. +func (e evidenceArgs) MarshalJSON() ([]byte, error) { + ev, err := jsontypes.Marshal(e.Evidence) + if err != nil { + return nil, err + } + return json.Marshal(struct { + Evidence json.RawMessage `json:"evidence"` + }{Evidence: ev}) +} diff --git a/types/block.go b/types/block.go index e09d1830a..241c360de 100644 --- a/types/block.go +++ b/types/block.go @@ -334,7 +334,7 @@ type Header struct { // basic block info Version version.Consensus `json:"version"` ChainID string `json:"chain_id"` - Height int64 `json:"height"` + Height int64 `json:"height,string"` Time time.Time `json:"time"` // prev block info @@ -748,7 +748,7 @@ type Commit struct { // ValidatorSet order. // Any peer with a block can gossip signatures by index with a peer without // recalculating the active ValidatorSet. - Height int64 `json:"height"` + Height int64 `json:"height,string"` Round int32 `json:"round"` BlockID BlockID `json:"block_id"` Signatures []CommitSig `json:"signatures"` diff --git a/types/evidence.go b/types/evidence.go index fa530761f..cca6fc899 100644 --- a/types/evidence.go +++ b/types/evidence.go @@ -13,6 +13,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/crypto/tmhash" + "github.com/tendermint/tendermint/internal/jsontypes" tmjson "github.com/tendermint/tendermint/libs/json" tmrand "github.com/tendermint/tendermint/libs/rand" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" @@ -28,6 +29,9 @@ type Evidence interface { String() string // string format of the evidence Time() time.Time // time of the infraction ValidateBasic() error // basic consistency check + + // Implementations must support tagged encoding in JSON. + jsontypes.Tagged } //-------------------------------------------------------------------------------------- @@ -38,11 +42,14 @@ type DuplicateVoteEvidence struct { VoteB *Vote `json:"vote_b"` // abci specific information - TotalVotingPower int64 - ValidatorPower int64 + TotalVotingPower int64 `json:",string"` + ValidatorPower int64 `json:",string"` Timestamp time.Time } +// TypeTag implements the jsontypes.Tagged interface. +func (*DuplicateVoteEvidence) TypeTag() string { return "tendermint/DuplicateVoteEvidence" } + var _ Evidence = &DuplicateVoteEvidence{} // NewDuplicateVoteEvidence creates DuplicateVoteEvidence with right ordering given @@ -236,14 +243,17 @@ func DuplicateVoteEvidenceFromProto(pb *tmproto.DuplicateVoteEvidence) (*Duplica // height, then nodes will treat this as of the Lunatic form, else it is of the Equivocation form. type LightClientAttackEvidence struct { ConflictingBlock *LightBlock - CommonHeight int64 + CommonHeight int64 `json:",string"` // abci specific information ByzantineValidators []*Validator // validators in the validator set that misbehaved in creating the conflicting block - TotalVotingPower int64 // total voting power of the validator set at the common height + TotalVotingPower int64 `json:",string"` // total voting power of the validator set at the common height Timestamp time.Time // timestamp of the block at the common height } +// TypeTag implements the jsontypes.Tagged interface. +func (*LightClientAttackEvidence) TypeTag() string { return "tendermint/LightClientAttackEvidence" } + var _ Evidence = &LightClientAttackEvidence{} // ABCI forms an array of abci evidence for each byzantine validator @@ -365,10 +375,10 @@ func (l *LightClientAttackEvidence) Height() int64 { // String returns a string representation of LightClientAttackEvidence func (l *LightClientAttackEvidence) String() string { return fmt.Sprintf(`LightClientAttackEvidence{ - ConflictingBlock: %v, - CommonHeight: %d, - ByzatineValidators: %v, - TotalVotingPower: %d, + ConflictingBlock: %v, + CommonHeight: %d, + ByzatineValidators: %v, + TotalVotingPower: %d, Timestamp: %v}#%X`, l.ConflictingBlock.String(), l.CommonHeight, l.ByzantineValidators, l.TotalVotingPower, l.Timestamp, l.Hash()) @@ -630,6 +640,9 @@ func EvidenceFromProto(evidence *tmproto.Evidence) (Evidence, error) { func init() { tmjson.RegisterType(&DuplicateVoteEvidence{}, "tendermint/DuplicateVoteEvidence") tmjson.RegisterType(&LightClientAttackEvidence{}, "tendermint/LightClientAttackEvidence") + + jsontypes.MustRegister((*DuplicateVoteEvidence)(nil)) + jsontypes.MustRegister((*LightClientAttackEvidence)(nil)) } //-------------------------------------------- ERRORS -------------------------------------- diff --git a/types/genesis.go b/types/genesis.go index a4b3904ab..d89de6ea2 100644 --- a/types/genesis.go +++ b/types/genesis.go @@ -9,6 +9,7 @@ import ( "time" "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/internal/jsontypes" tmbytes "github.com/tendermint/tendermint/libs/bytes" tmjson "github.com/tendermint/tendermint/libs/json" tmtime "github.com/tendermint/tendermint/libs/time" @@ -29,10 +30,23 @@ const ( type GenesisValidator struct { Address Address `json:"address"` PubKey crypto.PubKey `json:"pub_key"` - Power int64 `json:"power"` + Power int64 `json:"power,string"` Name string `json:"name"` } +func (g GenesisValidator) MarshalJSON() ([]byte, error) { + pk, err := jsontypes.Marshal(g.PubKey) + if err != nil { + return nil, err + } + return json.Marshal(struct { + Address Address `json:"address"` + PubKey json.RawMessage `json:"pub_key"` + Power int64 `json:"power,string"` + Name string `json:"name"` + }{Address: g.Address, PubKey: pk, Power: g.Power, Name: g.Name}) +} + // GenesisDoc defines the initial conditions for a tendermint blockchain, in particular its validator set. type GenesisDoc struct { GenesisTime time.Time `json:"genesis_time"` diff --git a/types/node_key.go b/types/node_key.go index aecbd8a21..8f59b6085 100644 --- a/types/node_key.go +++ b/types/node_key.go @@ -1,10 +1,12 @@ package types import ( + "encoding/json" "os" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/ed25519" + "github.com/tendermint/tendermint/internal/jsontypes" tmjson "github.com/tendermint/tendermint/libs/json" tmos "github.com/tendermint/tendermint/libs/os" ) @@ -22,14 +24,25 @@ type NodeKey struct { PrivKey crypto.PrivKey `json:"priv_key"` } +func (nk NodeKey) MarshalJSON() ([]byte, error) { + pk, err := jsontypes.Marshal(nk.PrivKey) + if err != nil { + return nil, err + } + return json.Marshal(struct { + ID NodeID `json:"id"` + PrivKey json.RawMessage `json:"priv_key"` + }{ID: nk.ID, PrivKey: pk}) +} + // PubKey returns the peer's PubKey -func (nodeKey NodeKey) PubKey() crypto.PubKey { - return nodeKey.PrivKey.PubKey() +func (nk NodeKey) PubKey() crypto.PubKey { + return nk.PrivKey.PubKey() } // SaveAs persists the NodeKey to filePath. -func (nodeKey NodeKey) SaveAs(filePath string) error { - jsonBytes, err := tmjson.Marshal(nodeKey) +func (nk NodeKey) SaveAs(filePath string) error { + jsonBytes, err := tmjson.Marshal(nk) if err != nil { return err } diff --git a/types/validator.go b/types/validator.go index ded8156bf..9d48c93e7 100644 --- a/types/validator.go +++ b/types/validator.go @@ -2,12 +2,14 @@ package types import ( "bytes" + "encoding/json" "errors" "fmt" "strings" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/encoding" + "github.com/tendermint/tendermint/internal/jsontypes" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" ) @@ -15,11 +17,23 @@ import ( // NOTE: The ProposerPriority is not included in Validator.Hash(); // make sure to update that method if changes are made here type Validator struct { - Address Address `json:"address"` - PubKey crypto.PubKey `json:"pub_key"` - VotingPower int64 `json:"voting_power"` + Address Address `json:"address"` + PubKey crypto.PubKey `json:"pub_key"` + VotingPower int64 `json:"voting_power,string"` + ProposerPriority int64 `json:"proposer_priority,string"` +} - ProposerPriority int64 `json:"proposer_priority"` +func (v Validator) MarshalJSON() ([]byte, error) { + pk, err := jsontypes.Marshal(v.PubKey) + if err != nil { + return nil, err + } + return json.Marshal(struct { + Addr Address `json:"address"` + PubKey json.RawMessage `json:"pub_key"` + Power int64 `json:"voting_power,string"` + Priority int64 `json:"proposer_priority,string"` + }{Addr: v.Address, PubKey: pk, Power: v.VotingPower, Priority: v.ProposerPriority}) } // NewValidator returns a new validator with the given pubkey and voting power. diff --git a/types/vote.go b/types/vote.go index 52fb73ec5..ceae65e48 100644 --- a/types/vote.go +++ b/types/vote.go @@ -49,7 +49,7 @@ type Address = crypto.Address // consensus. type Vote struct { Type tmproto.SignedMsgType `json:"type"` - Height int64 `json:"height"` + Height int64 `json:"height,string"` Round int32 `json:"round"` // assume there will not be greater than 2_147_483_647 rounds BlockID BlockID `json:"block_id"` // zero if vote is nil. Timestamp time.Time `json:"timestamp"` diff --git a/version/version.go b/version/version.go index e42952f77..483fca031 100644 --- a/version/version.go +++ b/version/version.go @@ -28,8 +28,8 @@ var ( ) type Consensus struct { - Block uint64 `json:"block"` - App uint64 `json:"app"` + Block uint64 `json:"block,string"` + App uint64 `json:"app,string"` } func (c Consensus) ToProto() tmversion.Consensus { From 701e6026c5468fff557e20335aea820224ba4697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Gr=C3=B6bler?= Date: Sat, 15 Jan 2022 03:33:47 +0100 Subject: [PATCH 02/11] Update README.md (#7602) Fixed typo in exchange Co-authored-by: M. J. Fromberger --- docs/tendermint-core/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tendermint-core/README.md b/docs/tendermint-core/README.md index 0de5ed908..d8af4a3d1 100644 --- a/docs/tendermint-core/README.md +++ b/docs/tendermint-core/README.md @@ -18,7 +18,7 @@ This section dives into the internals of Go-Tendermint. - [Mempool](./mempool/README.md) - [Light Client](./light-client.md) - [Consensus](./consensus/README.md) -- [Peer Exachange (PEX)](./pex/README.md) +- [Peer Exchange (PEX)](./pex/README.md) - [Evidence](./evidence/README.md) For full specifications refer to the [spec repo](https://github.com/tendermint/spec). From c24f003b556bb5195540337ee9ceb8fef5a0f776 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Sun, 16 Jan 2022 12:39:37 -0800 Subject: [PATCH 03/11] protoio: fix incorrect test assertion (#7606) After writing and then reading a bunch of random messages, the test was checking that it did not read the same number of messages that it wrote. The sense of this check was inverted; they should match. Introduced by accident in #7522. I'm not sure why this did not show up in CI. Edit: I now know why it didn't show up in ci: #7608. --- internal/libs/protoio/io_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/libs/protoio/io_test.go b/internal/libs/protoio/io_test.go index 1b81874af..4420ad786 100644 --- a/internal/libs/protoio/io_test.go +++ b/internal/libs/protoio/io_test.go @@ -95,7 +95,7 @@ func iotest(t *testing.T, writer protoio.WriteCloser, reader protoio.ReadCloser) } i++ } - require.NotEqual(t, size, i) + require.Equal(t, size, i, "messages read ≠ messages written") if err := reader.Close(); err != nil { return err } From 679b6a65b89b8e4bebb56d39cec38248ceeeaaa6 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Sun, 16 Jan 2022 13:48:21 -0800 Subject: [PATCH 04/11] light: fix provider error plumbing (#7610) The custom error types in the provider package did not propagate their wrapped underlying reasons, making it difficult for the test to check that the correct error was observed. - Fix the custom errors to have a true underlying error (not just a string). - Add Unwrap methods to support inspection by errors.Is. - Update usage in a few places. - Fix the test to check for acceptable variation. Fixes #7609. --- internal/statesync/dispatcher.go | 2 +- light/provider/errors.go | 10 +++++++--- light/provider/http/http.go | 8 ++++---- light/provider/http/http_test.go | 7 +++++-- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index 3f3e2a117..b12922343 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -195,7 +195,7 @@ func (p *BlockProvider) LightBlock(ctx context.Context, height int64) (*types.Li case errPeerAlreadyBusy: return nil, provider.ErrLightBlockNotFound default: - return nil, provider.ErrUnreliableProvider{Reason: err.Error()} + return nil, provider.ErrUnreliableProvider{Reason: err} } // check that the height requested is the same one returned diff --git a/light/provider/errors.go b/light/provider/errors.go index 355ec3475..d1a39f0c0 100644 --- a/light/provider/errors.go +++ b/light/provider/errors.go @@ -28,16 +28,20 @@ type ErrBadLightBlock struct { } func (e ErrBadLightBlock) Error() string { - return fmt.Sprintf("client provided bad signed header: %s", e.Reason.Error()) + return fmt.Sprintf("client provided bad signed header: %v", e.Reason) } +func (e ErrBadLightBlock) Unwrap() error { return e.Reason } + // ErrUnreliableProvider is a generic error that indicates that the provider isn't // behaving in a reliable manner to the light client. The light client will // remove the provider type ErrUnreliableProvider struct { - Reason string + Reason error } func (e ErrUnreliableProvider) Error() string { - return fmt.Sprintf("client deemed unreliable: %s", e.Reason) + return fmt.Sprintf("client deemed unreliable: %v", e.Reason) } + +func (e ErrUnreliableProvider) Unwrap() error { return e.Reason } diff --git a/light/provider/http/http.go b/light/provider/http/http.go index f8bf7d29e..79bb56c56 100644 --- a/light/provider/http/http.go +++ b/light/provider/http/http.go @@ -212,7 +212,7 @@ func (p *http) validatorSet(ctx context.Context, height *int64) (*types.Validato // If we don't know the error then by default we return an unreliable provider error and // terminate the connection with the peer. - return nil, provider.ErrUnreliableProvider{Reason: e.Error()} + return nil, provider.ErrUnreliableProvider{Reason: e} } // update the total and increment the page index so we can fetch the @@ -268,7 +268,7 @@ func (p *http) signedHeader(ctx context.Context, height *int64) (*types.SignedHe // If we don't know the error then by default we return an unreliable provider error and // terminate the connection with the peer. - return nil, provider.ErrUnreliableProvider{Reason: e.Error()} + return nil, provider.ErrUnreliableProvider{Reason: e} } } return nil, p.noResponse() @@ -278,7 +278,7 @@ func (p *http) noResponse() error { p.noResponseCount++ if p.noResponseCount > p.noResponseThreshold { return provider.ErrUnreliableProvider{ - Reason: fmt.Sprintf("failed to respond after %d attempts", p.noResponseCount), + Reason: fmt.Errorf("failed to respond after %d attempts", p.noResponseCount), } } return provider.ErrNoResponse @@ -288,7 +288,7 @@ func (p *http) noBlock(e error) error { p.noBlockCount++ if p.noBlockCount > p.noBlockThreshold { return provider.ErrUnreliableProvider{ - Reason: fmt.Sprintf("failed to provide a block after %d attempts", p.noBlockCount), + Reason: fmt.Errorf("failed to provide a block after %d attempts", p.noBlockCount), } } return e diff --git a/light/provider/http/http_test.go b/light/provider/http/http_test.go index 384a570ae..71cd78563 100644 --- a/light/provider/http/http_test.go +++ b/light/provider/http/http_test.go @@ -2,6 +2,7 @@ package http_test import ( "context" + "errors" "fmt" "testing" "time" @@ -100,8 +101,10 @@ func TestProvider(t *testing.T) { time.Sleep(10 * time.Second) lb, err = p.LightBlock(ctx, lower+2) - // we should see a connection refused + // Either the connection should be refused, or the context canceled. require.Error(t, err) require.Nil(t, lb) - assert.Equal(t, provider.ErrConnectionClosed, err) + if !errors.Is(err, provider.ErrConnectionClosed) && !errors.Is(err, context.Canceled) { + assert.Fail(t, "Incorrect error", "wanted connection closed or context canceled, got %v", err) + } } From 49153b753c5d900a3eec09da56568bb3e5e66685 Mon Sep 17 00:00:00 2001 From: Kene <48869444+spaceCh1mp@users.noreply.github.com> Date: Tue, 18 Jan 2022 10:58:32 +0100 Subject: [PATCH 05/11] rpc: paginate mempool /unconfirmed_txs endpoint (#7612) This commit changes the behaviour of the /unconfirmed_txs endpoint by replacing limit with a page and perPage parameter for pagination. The test case for unconfirmed_txs have been accommodated to properly test this change and the documentation for the API as well. --- CHANGELOG_PENDING.md | 1 + internal/rpc/core/mempool.go | 26 ++++++++++++------- internal/rpc/core/routes.go | 4 +-- light/rpc/client.go | 4 +-- rpc/client/http/http.go | 5 ++-- rpc/client/http/request.go | 3 ++- rpc/client/interface.go | 2 +- rpc/client/local/local.go | 4 +-- rpc/client/rpc_test.go | 49 +++++++++++++++++++++++------------- rpc/openapi/openapi.yaml | 14 ++++++++--- 10 files changed, 73 insertions(+), 39 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1609e9755..7a9c98700 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -53,6 +53,7 @@ Special thanks to external contributors on this release: - [pubsub] \#7319 Performance improvements for the event query API (@creachadair) - [node] \#7521 Define concrete type for seed node implementation (@spacech1mp) +- [rpc] \#7612 paginate mempool /unconfirmed_txs rpc endpoint (@spacech1mp) ### BUG FIXES diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 751c7ee73..4087439cb 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -10,6 +10,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/state/indexer" + tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/rpc/coretypes" "github.com/tendermint/tendermint/types" ) @@ -117,19 +118,26 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*co } } -// UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries) -// including their number. +// UnconfirmedTxs gets unconfirmed transactions from the mempool in order of priority // More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs -func (env *Environment) UnconfirmedTxs(ctx context.Context, limitPtr *int) (*coretypes.ResultUnconfirmedTxs, error) { - // reuse per_page validator - limit := env.validatePerPage(limitPtr) +func (env *Environment) UnconfirmedTxs(ctx context.Context, pagePtr, perPagePtr *int) (*coretypes.ResultUnconfirmedTxs, error) { + totalCount := env.Mempool.Size() + perPage := env.validatePerPage(perPagePtr) + page, err := validatePage(pagePtr, perPage, totalCount) + if err != nil { + return nil, err + } + + skipCount := validateSkipCount(page, perPage) + + txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount)) + result := txs[skipCount:] - txs := env.Mempool.ReapMaxTxs(limit) return &coretypes.ResultUnconfirmedTxs{ - Count: len(txs), - Total: env.Mempool.Size(), + Count: len(result), + Total: totalCount, TotalBytes: env.Mempool.SizeBytes(), - Txs: txs}, nil + Txs: result}, nil } // NumUnconfirmedTxs gets number of unconfirmed transactions. diff --git a/internal/rpc/core/routes.go b/internal/rpc/core/routes.go index fd26ab50e..09be47c5c 100644 --- a/internal/rpc/core/routes.go +++ b/internal/rpc/core/routes.go @@ -55,7 +55,7 @@ func NewRoutesMap(svc RPCService, opts *RouteOptions) RoutesMap { "dump_consensus_state": rpc.NewRPCFunc(svc.DumpConsensusState), "consensus_state": rpc.NewRPCFunc(svc.GetConsensusState), "consensus_params": rpc.NewRPCFunc(svc.ConsensusParams, "height"), - "unconfirmed_txs": rpc.NewRPCFunc(svc.UnconfirmedTxs, "limit"), + "unconfirmed_txs": rpc.NewRPCFunc(svc.UnconfirmedTxs, "page", "per_page"), "num_unconfirmed_txs": rpc.NewRPCFunc(svc.NumUnconfirmedTxs), // tx broadcast API @@ -107,7 +107,7 @@ type RPCService interface { Subscribe(ctx context.Context, query string) (*coretypes.ResultSubscribe, error) Tx(ctx context.Context, hash bytes.HexBytes, prove bool) (*coretypes.ResultTx, error) TxSearch(ctx context.Context, query string, prove bool, pagePtr, perPagePtr *int, orderBy string) (*coretypes.ResultTxSearch, error) - UnconfirmedTxs(ctx context.Context, limitPtr *int) (*coretypes.ResultUnconfirmedTxs, error) + UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) Unsubscribe(ctx context.Context, query string) (*coretypes.ResultUnsubscribe, error) UnsubscribeAll(ctx context.Context) (*coretypes.ResultUnsubscribe, error) Validators(ctx context.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*coretypes.ResultValidators, error) diff --git a/light/rpc/client.go b/light/rpc/client.go index fec6e4723..272100422 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -211,8 +211,8 @@ func (c *Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.R return c.next.BroadcastTxSync(ctx, tx) } -func (c *Client) UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) { - return c.next.UnconfirmedTxs(ctx, limit) +func (c *Client) UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) { + return c.next.UnconfirmedTxs(ctx, page, perPage) } func (c *Client) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) { diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index f10985ae3..f3ae8fe1c 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -281,11 +281,12 @@ func (c *baseRPCClient) broadcastTX( func (c *baseRPCClient) UnconfirmedTxs( ctx context.Context, - limit *int, + page *int, + perPage *int, ) (*coretypes.ResultUnconfirmedTxs, error) { result := new(coretypes.ResultUnconfirmedTxs) - if err := c.caller.Call(ctx, "unconfirmed_txs", unconfirmedArgs{Limit: limit}, result); err != nil { + if err := c.caller.Call(ctx, "unconfirmed_txs", unconfirmedArgs{Page: page, PerPage: perPage}, result); err != nil { return nil, err } return result, nil diff --git a/rpc/client/http/request.go b/rpc/client/http/request.go index a6f85b637..88d6b1d1b 100644 --- a/rpc/client/http/request.go +++ b/rpc/client/http/request.go @@ -27,7 +27,8 @@ type txKeyArgs struct { } type unconfirmedArgs struct { - Limit *int `json:"limit,string,omitempty"` + Page *int `json:"page,string,omitempty"` + PerPage *int `json:"per_page,string,omitempty"` } type heightArgs struct { diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 9b2a600cc..d5bbaec1b 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -142,7 +142,7 @@ type EventsClient interface { // MempoolClient shows us data about current mempool state. type MempoolClient interface { - UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) + UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) NumUnconfirmedTxs(context.Context) (*coretypes.ResultUnconfirmedTxs, error) CheckTx(context.Context, types.Tx) (*coretypes.ResultCheckTx, error) RemoveTx(context.Context, types.TxKey) error diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index 7f2ab46d4..95f3c63b9 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -97,8 +97,8 @@ func (c *Local) BroadcastTxSync(ctx context.Context, tx types.Tx) (*coretypes.Re return c.env.BroadcastTxSync(ctx, tx) } -func (c *Local) UnconfirmedTxs(ctx context.Context, limit *int) (*coretypes.ResultUnconfirmedTxs, error) { - return c.env.UnconfirmedTxs(ctx, limit) +func (c *Local) UnconfirmedTxs(ctx context.Context, page, perPage *int) (*coretypes.ResultUnconfirmedTxs, error) { + return c.env.UnconfirmedTxs(ctx, page, perPage) } func (c *Local) NumUnconfirmedTxs(ctx context.Context) (*coretypes.ResultUnconfirmedTxs, error) { diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 37cc9707b..3e0e39554 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -612,29 +612,44 @@ func TestClientMethodCallsAdvanced(t *testing.T) { pool := getMempool(t, n) t.Run("UnconfirmedTxs", func(t *testing.T) { - _, _, tx := MakeTxKV() - ch := make(chan struct{}) + // populate mempool with 5 tx + txs := make([]types.Tx, 5) + ch := make(chan error, 5) + for i := 0; i < 5; i++ { + _, _, tx := MakeTxKV() - err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { close(ch) }, mempool.TxInfo{}) - require.NoError(t, err) + txs[i] = tx + err := pool.CheckTx(ctx, tx, func(_ *abci.Response) { ch <- nil }, mempool.TxInfo{}) - // wait for tx to arrive in mempoool. - select { - case <-ch: - case <-time.After(5 * time.Second): - t.Error("Timed out waiting for CheckTx callback") + require.NoError(t, err) } + // wait for tx to arrive in mempoool. + for i := 0; i < 5; i++ { + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Error("Timed out waiting for CheckTx callback") + } + } + close(ch) for _, c := range GetClients(t, n, conf) { - mc := c.(client.MempoolClient) - limit := 1 - res, err := mc.UnconfirmedTxs(ctx, &limit) - require.NoError(t, err) + for i := 1; i <= 2; i++ { + mc := c.(client.MempoolClient) + page, perPage := i, 3 + res, err := mc.UnconfirmedTxs(ctx, &page, &perPage) + require.NoError(t, err) - assert.Equal(t, 1, res.Count) - assert.Equal(t, 1, res.Total) - assert.Equal(t, pool.SizeBytes(), res.TotalBytes) - assert.Exactly(t, types.Txs{tx}, types.Txs(res.Txs)) + if i == 2 { + perPage = 2 + } + assert.Equal(t, perPage, res.Count) + assert.Equal(t, 5, res.Total) + assert.Equal(t, pool.SizeBytes(), res.TotalBytes) + for _, tx := range res.Txs { + assert.Contains(t, txs, tx) + } + } } pool.Flush() diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index bcb5739dd..ba48ee82e 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -1062,13 +1062,21 @@ paths: operationId: unconfirmed_txs parameters: - in: query - name: limit - description: Maximum number of unconfirmed transactions to return (max 100) + name: page + description: "Page number (1-based)" required: false schema: type: integer - default: 30 + default: 1 example: 1 + - in: query + name: per_page + description: "Number of entries per page (max: 100)" + required: false + schema: + type: integer + example: 100 + default: 30 tags: - Info description: | From c0b56e207aa4c5d6fb7e4e41bbd61ca9fc1b7774 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 18 Jan 2022 08:55:13 -0500 Subject: [PATCH 06/11] consensus: test shutdown to avoid hangs (#7603) --- internal/consensus/byzantine_test.go | 1 - internal/consensus/replay_test.go | 2 ++ internal/consensus/state_test.go | 6 +++++- internal/consensus/wal_test.go | 6 +++++- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/internal/consensus/byzantine_test.go b/internal/consensus/byzantine_test.go index 5408f2969..0e3013d95 100644 --- a/internal/consensus/byzantine_test.go +++ b/internal/consensus/byzantine_test.go @@ -255,7 +255,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { } msg, err := s.Next(ctx) - assert.NoError(t, err) if err != nil { cancel() diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index 19d86dac8..bf790bf90 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -201,6 +201,8 @@ LOOP: i++ select { + case <-rctx.Done(): + t.Fatal("context canceled before test completed") case err := <-walPanicked: // make sure we can make blocks after a crash startNewStateAndWaitForBlock(ctx, t, consensusReplayConfig, cs.Height, blockDB, stateStore) diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 25158a1d8..43277eecc 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -2131,7 +2131,11 @@ func subscribe( t.Errorf("Subscription for %v unexpectedly terminated: %v", q, err) return } - ch <- next + select { + case ch <- next: + case <-ctx.Done(): + return + } } }() return ch diff --git a/internal/consensus/wal_test.go b/internal/consensus/wal_test.go index f686fece6..24b1d3dfc 100644 --- a/internal/consensus/wal_test.go +++ b/internal/consensus/wal_test.go @@ -3,6 +3,7 @@ package consensus import ( "bytes" "context" + "errors" "path/filepath" "testing" @@ -15,6 +16,7 @@ import ( "github.com/tendermint/tendermint/internal/consensus/types" "github.com/tendermint/tendermint/internal/libs/autofile" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" tmtime "github.com/tendermint/tendermint/libs/time" tmtypes "github.com/tendermint/tendermint/types" ) @@ -185,7 +187,9 @@ func TestWALPeriodicSync(t *testing.T) { require.NoError(t, wal.Start(ctx)) t.Cleanup(func() { if err := wal.Stop(); err != nil { - t.Error(err) + if !errors.Is(err, service.ErrAlreadyStopped) { + t.Error(err) + } } wal.Wait() }) From b10c74647fe68b41630fdfcd53a5fb86e905e284 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Tue, 18 Jan 2022 09:08:20 -0500 Subject: [PATCH 07/11] testing: use noop loger with leakteset in more places (#7604) --- privval/signer_client_test.go | 20 +++++++-------- privval/signer_listener_endpoint_test.go | 6 +++-- rpc/jsonrpc/client/ws_client_test.go | 5 +++- rpc/jsonrpc/jsonrpc_test.go | 32 ++++++++++++++++++------ rpc/jsonrpc/server/http_server_test.go | 18 +++++++++---- rpc/jsonrpc/server/ws_handler_test.go | 5 +++- 6 files changed, 59 insertions(+), 27 deletions(-) diff --git a/privval/signer_client_test.go b/privval/signer_client_test.go index 2867d7be5..b77649684 100644 --- a/privval/signer_client_test.go +++ b/privval/signer_client_test.go @@ -69,7 +69,7 @@ func TestSignerClose(t *testing.T) { bctx, bcancel := context.WithCancel(context.Background()) defer bcancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(bctx, t, logger) { t.Run(tc.name, func(t *testing.T) { @@ -91,7 +91,7 @@ func TestSignerPing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { err := tc.signerClient.Ping(ctx) @@ -105,7 +105,7 @@ func TestSignerGetPubKey(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { t.Run(tc.name, func(t *testing.T) { @@ -135,7 +135,7 @@ func TestSignerProposal(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { t.Run(tc.name, func(t *testing.T) { @@ -175,7 +175,7 @@ func TestSignerVote(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { t.Run(tc.name, func(t *testing.T) { @@ -218,7 +218,7 @@ func TestSignerVoteResetDeadline(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { t.Run(tc.name, func(t *testing.T) { @@ -269,7 +269,7 @@ func TestSignerVoteKeepAlive(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { t.Run(tc.name, func(t *testing.T) { @@ -319,7 +319,7 @@ func TestSignerSignProposalErrors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { t.Run(tc.name, func(t *testing.T) { @@ -360,7 +360,7 @@ func TestSignerSignVoteErrors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { t.Run(tc.name, func(t *testing.T) { @@ -426,7 +426,7 @@ func TestSignerUnexpectedResponse(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getSignerTestCases(ctx, t, logger) { t.Run(tc.name, func(t *testing.T) { diff --git a/privval/signer_listener_endpoint_test.go b/privval/signer_listener_endpoint_test.go index cf5567561..4c9c31c42 100644 --- a/privval/signer_listener_endpoint_test.go +++ b/privval/signer_listener_endpoint_test.go @@ -40,10 +40,12 @@ func TestSignerRemoteRetryTCPOnly(t *testing.T) { retries = 10 ) + t.Cleanup(leaktest.Check(t)) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() ln, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) @@ -95,7 +97,7 @@ func TestRetryConnToRemoteSigner(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() for _, tc := range getDialerTestCases(t) { var ( diff --git a/rpc/jsonrpc/client/ws_client_test.go b/rpc/jsonrpc/client/ws_client_test.go index 00d138c4d..31cd06743 100644 --- a/rpc/jsonrpc/client/ws_client_test.go +++ b/rpc/jsonrpc/client/ws_client_test.go @@ -220,6 +220,9 @@ func TestNotBlockingOnStop(t *testing.T) { func startClient(ctx context.Context, t *testing.T, addr string) *WSClient { t.Helper() + + t.Cleanup(leaktest.Check(t)) + opts := DefaultWSOptions() opts.SkipMetrics = true c, err := NewWSWithOptions(addr, "/websocket", opts) @@ -227,7 +230,7 @@ func startClient(ctx context.Context, t *testing.T, addr string) *WSClient { require.NoError(t, err) err = c.Start(ctx) require.NoError(t, err) - c.Logger = log.NewTestingLogger(t) + c.Logger = log.NewNopLogger() return c } diff --git a/rpc/jsonrpc/jsonrpc_test.go b/rpc/jsonrpc/jsonrpc_test.go index ed4fb503a..b167e91ec 100644 --- a/rpc/jsonrpc/jsonrpc_test.go +++ b/rpc/jsonrpc/jsonrpc_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -265,8 +266,13 @@ func testWithWSClient(ctx context.Context, t *testing.T, cl *client.WSClient) { //------------- func TestServersAndClientsBasic(t *testing.T) { - bctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // TODO: reenable the leak detector once the test fixture is + // managed in the context of this test. + // + // t.Cleanup(leaktest.Check(t)) + + bctx, bcancel := context.WithCancel(context.Background()) + defer bcancel() serverAddrs := [...]string{tcpAddr, unixAddr} for _, addr := range serverAddrs { @@ -274,14 +280,16 @@ func TestServersAndClientsBasic(t *testing.T) { ctx, cancel := context.WithCancel(bctx) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() cl2, err := client.New(addr) require.NoError(t, err) fmt.Printf("=== testing server on %s using JSONRPC client", addr) testWithHTTPClient(ctx, t, cl2) - cl3, err := client.NewWS(addr, websocketEndpoint) + opts := client.DefaultWSOptions() + opts.SkipMetrics = true + cl3, err := client.NewWSWithOptions(addr, websocketEndpoint, opts) require.NoError(t, err) cl3.Logger = logger err = cl3.Start(ctx) @@ -294,12 +302,16 @@ func TestServersAndClientsBasic(t *testing.T) { } func TestWSNewWSRPCFunc(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cl, err := client.NewWS(tcpAddr, websocketEndpoint) + opts := client.DefaultWSOptions() + opts.SkipMetrics = true + cl, err := client.NewWSWithOptions(tcpAddr, websocketEndpoint, opts) require.NoError(t, err) - cl.Logger = log.NewTestingLogger(t) + cl.Logger = log.NewNopLogger() err = cl.Start(ctx) require.NoError(t, err) t.Cleanup(func() { @@ -329,12 +341,16 @@ func TestWSNewWSRPCFunc(t *testing.T) { // TestWSClientPingPong checks that a client & server exchange pings // & pongs so connection stays alive. func TestWSClientPingPong(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cl, err := client.NewWS(tcpAddr, websocketEndpoint) + opts := client.DefaultWSOptions() + opts.SkipMetrics = true + cl, err := client.NewWSWithOptions(tcpAddr, websocketEndpoint, opts) require.NoError(t, err) - cl.Logger = log.NewTestingLogger(t) + cl.Logger = log.NewNopLogger() err = cl.Start(ctx) require.NoError(t, err) t.Cleanup(func() { diff --git a/rpc/jsonrpc/server/http_server_test.go b/rpc/jsonrpc/server/http_server_test.go index 9fd45c1e4..ca67de708 100644 --- a/rpc/jsonrpc/server/http_server_test.go +++ b/rpc/jsonrpc/server/http_server_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,10 +29,12 @@ type sampleResult struct { func TestMaxOpenConnections(t *testing.T) { const max = 5 // max simultaneous connections + t.Cleanup(leaktest.Check(t)) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() // Start the server. var open int32 @@ -78,6 +81,8 @@ func TestMaxOpenConnections(t *testing.T) { } func TestServeTLS(t *testing.T) { + t.Cleanup(leaktest.Check(t)) + ln, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) defer ln.Close() @@ -90,11 +95,14 @@ func TestServeTLS(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() chErr := make(chan error, 1) go func() { - chErr <- ServeTLS(ctx, ln, mux, "test.crt", "test.key", logger, DefaultConfig()) + select { + case chErr <- ServeTLS(ctx, ln, mux, "test.crt", "test.key", logger, DefaultConfig()): + case <-ctx.Done(): + } }() select { @@ -122,7 +130,7 @@ func TestWriteRPCResponse(t *testing.T) { // one argument w := httptest.NewRecorder() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() writeRPCResponse(w, logger, rpctypes.NewRPCSuccessResponse(id, &sampleResult{"hello"})) resp := w.Result() @@ -153,7 +161,7 @@ func TestWriteRPCResponse(t *testing.T) { func TestWriteHTTPResponse(t *testing.T) { w := httptest.NewRecorder() - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() writeHTTPResponse(w, logger, rpctypes.RPCInternalError(rpctypes.JSONRPCIntID(-1), errors.New("foo"))) resp := w.Result() diff --git a/rpc/jsonrpc/server/ws_handler_test.go b/rpc/jsonrpc/server/ws_handler_test.go index b622cf0c6..3d78c0d9b 100644 --- a/rpc/jsonrpc/server/ws_handler_test.go +++ b/rpc/jsonrpc/server/ws_handler_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "testing" + "github.com/fortytw2/leaktest" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" @@ -14,11 +15,13 @@ import ( ) func TestWebsocketManagerHandler(t *testing.T) { - logger := log.NewTestingLogger(t) + logger := log.NewNopLogger() s := newWSServer(t, logger) defer s.Close() + t.Cleanup(leaktest.Check(t)) + // check upgrader works d := websocket.Dialer{} c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil) From 0c82ceaa5f7964c13247af9b64d72477af9dc973 Mon Sep 17 00:00:00 2001 From: William Banfield <4561443+williambanfield@users.noreply.github.com> Date: Tue, 18 Jan 2022 09:55:18 -0500 Subject: [PATCH 08/11] consensus: calculate prevote message delay metric (#7551) ## What does this pull request do? This pull requests adds two metrics intended for use in calculating an experimental value for `MessageDelay`. The metrics are as follows: ``` # HELP tendermint_consensus_complete_prevote_message_delay Difference in seconds between the proposal timestamp and the timestamp of the prevote that achieved 100% of the voting power in the prevote step. # TYPE tendermint_consensus_complete_prevote_message_delay gauge tendermint_consensus_complete_prevote_message_delay{chain_id="test-chain-aZbwF1"} 0.013025505 # HELP tendermint_consensus_quorum_prevote_message_delay Difference in seconds between the proposal timestamp and the timestamp of the prevote that achieved a quorum in the prevote step. # TYPE tendermint_consensus_quorum_prevote_message_delay gauge tendermint_consensus_quorum_prevote_message_delay{chain_id="test-chain-aZbwF1"} 0.013025505 ``` ## Why this change? For more information on what these metrics are calculating, see #7202. The aim is to merge to backport these metrics to v0.34 and run nodes on a few popular chains with these metrics to determine the experimental values for `MessageDelay` on these popular chains and use these to select our default `SynchronyParams.MessageDelay` value. ## Why Gauges for the metrics? Gauges allow us to overwrite the metric on each successive observation. We can then capture these metrics over time to track the highest and lowest observed value. --- internal/consensus/metrics.go | 46 +++++++++++++++++++++++++++++------ internal/consensus/state.go | 23 ++++++++++++++++++ types/vote_set.go | 17 +++++++++++++ 3 files changed, 79 insertions(+), 7 deletions(-) diff --git a/internal/consensus/metrics.go b/internal/consensus/metrics.go index a75f1505c..dbefb9a5b 100644 --- a/internal/consensus/metrics.go +++ b/internal/consensus/metrics.go @@ -64,6 +64,22 @@ type Metrics struct { // Histogram of time taken per step annotated with reason that the step proceeded. StepTime metrics.Histogram + + // QuroumPrevoteMessageDelay is the interval in seconds between the proposal + // timestamp and the timestamp of the earliest prevote that achieved a quorum + // during the prevote step. + // + // To compute it, sum the voting power over each prevote received, in increasing + // order of timestamp. The timestamp of the first prevote to increase the sum to + // be above 2/3 of the total voting power of the network defines the endpoint + // the endpoint of the interval. Subtract the proposal timestamp from this endpoint + // to obtain the quorum delay. + QuorumPrevoteMessageDelay metrics.Gauge + + // FullPrevoteMessageDelay is the interval in seconds between the proposal + // timestamp and the timestamp of the latest prevote in a round where 100% + // of the voting power on the network issued prevotes. + FullPrevoteMessageDelay metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -196,6 +212,20 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "step_time", Help: "Time spent per step.", }, append(labels, "step", "reason")).With(labelsAndValues...), + QuorumPrevoteMessageDelay: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "quorum_prevote_message_delay", + Help: "Difference in seconds between the proposal timestamp and the timestamp " + + "of the latest prevote that achieved a quorum in the prevote step.", + }, labels).With(labelsAndValues...), + FullPrevoteMessageDelay: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "full_prevote_message_delay", + Help: "Difference in seconds between the proposal timestamp and the timestamp " + + "of the latest prevote that achieved 100% of the voting power in the prevote step.", + }, labels).With(labelsAndValues...), } } @@ -219,13 +249,15 @@ func NopMetrics() *Metrics { BlockIntervalSeconds: discard.NewHistogram(), - NumTxs: discard.NewGauge(), - BlockSizeBytes: discard.NewHistogram(), - TotalTxs: discard.NewGauge(), - CommittedHeight: discard.NewGauge(), - BlockSyncing: discard.NewGauge(), - StateSyncing: discard.NewGauge(), - BlockParts: discard.NewCounter(), + NumTxs: discard.NewGauge(), + BlockSizeBytes: discard.NewHistogram(), + TotalTxs: discard.NewGauge(), + CommittedHeight: discard.NewGauge(), + BlockSyncing: discard.NewGauge(), + StateSyncing: discard.NewGauge(), + BlockParts: discard.NewCounter(), + QuorumPrevoteMessageDelay: discard.NewGauge(), + FullPrevoteMessageDelay: discard.NewGauge(), } } diff --git a/internal/consensus/state.go b/internal/consensus/state.go index 9d12ce50a..17d032464 100644 --- a/internal/consensus/state.go +++ b/internal/consensus/state.go @@ -9,6 +9,7 @@ import ( "io" "os" "runtime/debug" + "sort" "sync" "time" @@ -1671,6 +1672,8 @@ func (cs *State) finalizeCommit(ctx context.Context, height int64) { return } + cs.calculatePrevoteMessageDelayMetrics() + blockID, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority() block, blockParts := cs.ProposalBlock, cs.ProposalBlockParts @@ -2395,6 +2398,26 @@ func (cs *State) checkDoubleSigningRisk(height int64) error { return nil } +func (cs *State) calculatePrevoteMessageDelayMetrics() { + ps := cs.Votes.Prevotes(cs.Round) + pl := ps.List() + sort.Slice(pl, func(i, j int) bool { + return pl[i].Timestamp.Before(pl[j].Timestamp) + }) + var votingPowerSeen int64 + for _, v := range pl { + _, val := cs.Validators.GetByAddress(v.ValidatorAddress) + votingPowerSeen += val.VotingPower + if votingPowerSeen >= cs.Validators.TotalVotingPower()*2/3+1 { + cs.metrics.QuorumPrevoteMessageDelay.Set(v.Timestamp.Sub(cs.Proposal.Timestamp).Seconds()) + break + } + } + if ps.HasAll() { + cs.metrics.FullPrevoteMessageDelay.Set(pl[len(pl)-1].Timestamp.Sub(cs.Proposal.Timestamp).Seconds()) + } +} + //--------------------------------------------------------- func CompareHRS(h1 int64, r1 int32, s1 cstypes.RoundStepType, h2 int64, r2 int32, s2 cstypes.RoundStepType) int { diff --git a/types/vote_set.go b/types/vote_set.go index f125af317..4aae267ad 100644 --- a/types/vote_set.go +++ b/types/vote_set.go @@ -378,6 +378,20 @@ func (voteSet *VoteSet) GetByIndex(valIndex int32) *Vote { return voteSet.votes[valIndex] } +// List returns a copy of the list of votes stored by the VoteSet. +func (voteSet *VoteSet) List() []Vote { + if voteSet == nil || voteSet.votes == nil { + return nil + } + votes := make([]Vote, 0, len(voteSet.votes)) + for i := range voteSet.votes { + if voteSet.votes[i] != nil { + votes = append(votes, *voteSet.votes[i]) + } + } + return votes +} + func (voteSet *VoteSet) GetByAddress(address []byte) *Vote { if voteSet == nil { return nil @@ -423,6 +437,9 @@ func (voteSet *VoteSet) HasTwoThirdsAny() bool { } func (voteSet *VoteSet) HasAll() bool { + if voteSet == nil { + return false + } voteSet.mtx.Lock() defer voteSet.mtx.Unlock() return voteSet.sum == voteSet.valSet.TotalVotingPower() From 7fd97bf44b6357d64d8c0bd6e747bfd0e6a29f56 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 18 Jan 2022 07:22:50 -0800 Subject: [PATCH 09/11] pex: avert a data race on map access in the reactor (#7614) There was a path on which computing the next delivery time did not hold the lock, defying the admonition on its comment. --- internal/p2p/pex/reactor.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 82a3e2ca8..87609cc85 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -158,10 +158,13 @@ func (r *Reactor) OnStop() {} func (r *Reactor) processPexCh(ctx context.Context) { timer := time.NewTimer(0) defer timer.Stop() + + r.mtx.Lock() var ( duration = r.calculateNextRequestTime() err error ) + r.mtx.Unlock() incoming := make(chan *p2p.Envelope) go func() { @@ -377,7 +380,8 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context) (time.Duration, error // as possible. As the node becomes more familiar with the network the ratio of // new nodes will plummet to a very small number, meaning the interval expands // to its upper bound. -// CONTRACT: Must use a write lock as nextRequestTime is updated +// +// CONTRACT: The caller must hold r.mtx exclusively when calling this method. func (r *Reactor) calculateNextRequestTime() time.Duration { // check if the peer store is full. If so then there is no need // to send peer requests too often From 417166704a0b2417cd947a86300c66d70e98ee30 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 18 Jan 2022 12:01:04 -0800 Subject: [PATCH 10/11] pex: do not send nil envelopes to the reactor (#7622) --- internal/p2p/pex/reactor.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 87609cc85..2faa3130d 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -171,10 +171,14 @@ func (r *Reactor) processPexCh(ctx context.Context) { defer close(incoming) iter := r.pexCh.Receive(ctx) for iter.Next(ctx) { + env := iter.Envelope() + if env == nil { + break + } select { case <-ctx.Done(): return - case incoming <- iter.Envelope(): + case incoming <- env: } } }() From 5cca45bb457b5b4df1dbae3af46b44301dae2dbe Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 18 Jan 2022 14:32:22 -0800 Subject: [PATCH 11/11] pex: improve handling of closed channels (#7623) Reverts and improves on #7622. The problem turns out not to be on the PEX channel side, but on the pass-through (Go) channel. --- internal/p2p/pex/reactor.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/p2p/pex/reactor.go b/internal/p2p/pex/reactor.go index 2faa3130d..0c256a4f3 100644 --- a/internal/p2p/pex/reactor.go +++ b/internal/p2p/pex/reactor.go @@ -171,14 +171,10 @@ func (r *Reactor) processPexCh(ctx context.Context) { defer close(incoming) iter := r.pexCh.Receive(ctx) for iter.Next(ctx) { - env := iter.Envelope() - if env == nil { - break - } select { case <-ctx.Done(): return - case incoming <- env: + case incoming <- iter.Envelope(): } } }() @@ -198,7 +194,10 @@ func (r *Reactor) processPexCh(ctx context.Context) { } // inbound requests for new peers or responses to requests sent by this // reactor - case envelope := <-incoming: + case envelope, ok := <-incoming: + if !ok { + return + } duration, err = r.handleMessage(ctx, r.pexCh.ID, envelope) if err != nil { r.logger.Error("failed to process message", "ch_id", r.pexCh.ID, "envelope", envelope, "err", err)