mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-12 15:52:50 +00:00
Compare commits
62 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eab4d6d82b | ||
|
|
cf114c98d4 | ||
|
|
49d8c0b174 | ||
|
|
63c9384c8c | ||
|
|
0a1ef4aa0f | ||
|
|
ffb0278d95 | ||
|
|
e6a7757bb4 | ||
|
|
51ccaf6fb8 | ||
|
|
be771f225a | ||
|
|
aad59f2a9a | ||
|
|
d494952c82 | ||
|
|
14e04f7606 | ||
|
|
83f1801625 | ||
|
|
95530285d9 | ||
|
|
7b67ee408b | ||
|
|
7ffd3fff43 | ||
|
|
c013501f45 | ||
|
|
7ec2dff6fd | ||
|
|
b5cad43b26 | ||
|
|
714948505b | ||
|
|
564d6a203a | ||
|
|
c38dbdb640 | ||
|
|
c207fa6eff | ||
|
|
470a23f9b4 | ||
|
|
c4ba93a1e6 | ||
|
|
88946fd6d8 | ||
|
|
ab62fd977f | ||
|
|
0f111b3c5c | ||
|
|
b08f655024 | ||
|
|
d6ea1ed96f | ||
|
|
d06286916d | ||
|
|
0354ea87f7 | ||
|
|
ff2308b5f4 | ||
|
|
0ad70fb69a | ||
|
|
86a581f28f | ||
|
|
76f3db06b8 | ||
|
|
15878dc80c | ||
|
|
aacc71dc29 | ||
|
|
d56fb6ed22 | ||
|
|
8025d402e2 | ||
|
|
513a32a6e3 | ||
|
|
5c9d6d839e | ||
|
|
53fdcfd7e9 | ||
|
|
5f6617db7a | ||
|
|
6d4f18aa8c | ||
|
|
55066ceaad | ||
|
|
58c3e590b4 | ||
|
|
ff9e08a32f | ||
|
|
0335add437 | ||
|
|
5d0e7034e8 | ||
|
|
abc30821f4 | ||
|
|
e89991c445 | ||
|
|
df6df61ea9 | ||
|
|
5ed39fd0b3 | ||
|
|
17b69d4d56 | ||
|
|
5d7e22a53c | ||
|
|
073cd1125e | ||
|
|
7041001fb6 | ||
|
|
c264db339e | ||
|
|
8da43508f8 | ||
|
|
9867a65de7 | ||
|
|
9df117748e |
917
CHANGELOG.md
917
CHANGELOG.md
File diff suppressed because it is too large
Load Diff
@@ -1,9 +1,8 @@
|
||||
## v0.32.8
|
||||
## v0.32.11
|
||||
|
||||
\*\*
|
||||
|
||||
Special thanks to external contributors on this release:
|
||||
@erikgrinaker
|
||||
|
||||
Friendly reminder, we have a [bug bounty
|
||||
program](https://hackerone.com/tendermint).
|
||||
@@ -15,24 +14,10 @@ program](https://hackerone.com/tendermint).
|
||||
- Apps
|
||||
|
||||
- Go API
|
||||
- [libs/pubsub] [\#4070](https://github.com/tendermint/tendermint/pull/4070) `Query#(Matches|Conditions)` returns an error.
|
||||
|
||||
### FEATURES:
|
||||
|
||||
### IMPROVEMENTS:
|
||||
|
||||
- [mempool] [\#4083](https://github.com/tendermint/tendermint/pull/4083) Added TxInfo parameter to CheckTx(), and removed CheckTxWithInfo() (@erikgrinaker)
|
||||
- [mempool] [\#4057](https://github.com/tendermint/tendermint/issues/4057) Include peer ID when logging rejected txns (@erikgrinaker)
|
||||
- [tools] [\#4023](https://github.com/tendermint/tendermint/issues/4023) Improved `tm-monitor` formatting of start time and avg tx throughput (@erikgrinaker)
|
||||
- [libs/pubsub] [\#4070](https://github.com/tendermint/tendermint/pull/4070) No longer panic in `Query#(Matches|Conditions)` preferring to return an error instead.
|
||||
- [libs/pubsub] [\#4070](https://github.com/tendermint/tendermint/pull/4070) Strip out non-numeric characters when attempting to match numeric values.
|
||||
- [p2p] [\#3991](https://github.com/tendermint/tendermint/issues/3991) Log "has been established or dialed" as debug log instead of Error for connected peers (@whunmr)
|
||||
- [rpc] [\#4077](https://github.com/tendermint/tendermint/pull/4077) Added support for `EXISTS` clause to the Websocket query interface.
|
||||
- [privval] Add `SignerDialerEndpointRetryWaitInterval` option (@cosmostuba)
|
||||
- [crypto] Add `RegisterKeyType` to amino to allow external key types registration (@austinabell)
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [tools] [\#4023](https://github.com/tendermint/tendermint/issues/4023) Refresh `tm-monitor` health when validator count is updated (@erikgrinaker)
|
||||
- [state] [\#4104](https://github.com/tendermint/tendermint/pull/4104) txindex/kv: Fsync data to disk immediately after receiving it (@guagualvcha)
|
||||
- [state] [\#4095](https://github.com/tendermint/tendermint/pull/4095) txindex/kv: Return an error if there's one when the user searches for a tx (hash=X) (@hsyis)
|
||||
|
||||
@@ -53,7 +53,7 @@ func testCounter() {
|
||||
}
|
||||
|
||||
fmt.Printf("Running %s test with abci=%s\n", abciApp, abciType)
|
||||
cmd := exec.Command("bash", "-c", fmt.Sprintf("abci-cli %s", abciApp))
|
||||
cmd := exec.Command("bash", "-c", fmt.Sprintf("abci-cli %s", abciApp)) //nolint:gosec
|
||||
cmd.Stdout = os.Stdout
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Fatalf("starting %q err: %v", abciApp, err)
|
||||
|
||||
@@ -19,6 +19,9 @@ type Metrics struct {
|
||||
// Height of the chain.
|
||||
Height metrics.Gauge
|
||||
|
||||
// ValidatorLastSignedHeight of a validator.
|
||||
ValidatorLastSignedHeight metrics.Gauge
|
||||
|
||||
// Number of rounds.
|
||||
Rounds metrics.Gauge
|
||||
|
||||
@@ -26,6 +29,10 @@ type Metrics struct {
|
||||
Validators metrics.Gauge
|
||||
// Total power of all validators.
|
||||
ValidatorsPower metrics.Gauge
|
||||
// Power of a validator.
|
||||
ValidatorPower metrics.Gauge
|
||||
// Amount of blocks missed by a validator.
|
||||
ValidatorMissedBlocks metrics.Gauge
|
||||
// Number of validators who did not sign.
|
||||
MissingValidators metrics.Gauge
|
||||
// Total power of the missing validators.
|
||||
@@ -81,12 +88,30 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
Name: "validators",
|
||||
Help: "Number of validators.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_last_signed_height",
|
||||
Help: "Last signed height for a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
ValidatorMissedBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_missed_blocks",
|
||||
Help: "Total missed blocks for a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validators_power",
|
||||
Help: "Total power of all validators.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ValidatorPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_power",
|
||||
Help: "Power of a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
@@ -163,10 +188,14 @@ func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
Height: discard.NewGauge(),
|
||||
|
||||
ValidatorLastSignedHeight: discard.NewGauge(),
|
||||
|
||||
Rounds: discard.NewGauge(),
|
||||
|
||||
Validators: discard.NewGauge(),
|
||||
ValidatorsPower: discard.NewGauge(),
|
||||
ValidatorPower: discard.NewGauge(),
|
||||
ValidatorMissedBlocks: discard.NewGauge(),
|
||||
MissingValidators: discard.NewGauge(),
|
||||
MissingValidatorsPower: discard.NewGauge(),
|
||||
ByzantineValidators: discard.NewGauge(),
|
||||
|
||||
@@ -57,7 +57,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options
|
||||
metrics: NopMetrics(),
|
||||
}
|
||||
conR.updateFastSyncingMetric()
|
||||
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
|
||||
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
|
||||
|
||||
for _, option := range options {
|
||||
option(conR)
|
||||
@@ -501,10 +501,12 @@ OUTER_LOOP:
|
||||
if prs.ProposalBlockParts == nil {
|
||||
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
|
||||
if blockMeta == nil {
|
||||
panic(fmt.Sprintf("Failed to load block %d when blockStore is at %d",
|
||||
prs.Height, conR.conS.blockStore.Height()))
|
||||
heightLogger.Error("Failed to load block meta",
|
||||
"blockstoreHeight", conR.conS.blockStore.Height())
|
||||
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
|
||||
} else {
|
||||
ps.InitProposalBlockParts(blockMeta.BlockID.PartsHeader)
|
||||
}
|
||||
ps.InitProposalBlockParts(blockMeta.BlockID.PartsHeader)
|
||||
// continue the loop since prs is a copy and not effected by this initialization
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
||||
@@ -888,7 +888,7 @@ func TestVoteSetMaj23MessageValidateBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVoteSetBitsMessageValidateBasic(t *testing.T) {
|
||||
testCases := []struct { // nolint: maligned
|
||||
testCases := []struct {
|
||||
malleateFn func(*VoteSetBitsMessage)
|
||||
expErr string
|
||||
}{
|
||||
|
||||
@@ -1411,7 +1411,8 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
|
||||
// restart).
|
||||
endMsg := EndHeightMessage{height}
|
||||
if err := cs.wal.WriteSync(endMsg); err != nil { // NOTE: fsync
|
||||
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", endMsg, err))
|
||||
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node",
|
||||
endMsg, err))
|
||||
}
|
||||
|
||||
fail.Fail() // XXX
|
||||
@@ -1458,8 +1459,11 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
|
||||
func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
|
||||
cs.metrics.Validators.Set(float64(cs.Validators.Size()))
|
||||
cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower()))
|
||||
missingValidators := 0
|
||||
missingValidatorsPower := int64(0)
|
||||
|
||||
var (
|
||||
missingValidators = 0
|
||||
missingValidatorsPower int64
|
||||
)
|
||||
for i, val := range cs.Validators.Validators {
|
||||
var vote *types.CommitSig
|
||||
if i < len(block.LastCommit.Precommits) {
|
||||
@@ -1469,9 +1473,22 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
|
||||
missingValidators++
|
||||
missingValidatorsPower += val.VotingPower
|
||||
}
|
||||
|
||||
if cs.privValidator != nil && bytes.Equal(val.Address, cs.privValidator.GetPubKey().Address()) {
|
||||
label := []string{
|
||||
"validator_address", val.Address.String(),
|
||||
}
|
||||
cs.metrics.ValidatorPower.With(label...).Set(float64(val.VotingPower))
|
||||
if vote != nil {
|
||||
cs.metrics.ValidatorLastSignedHeight.With(label...).Set(float64(height))
|
||||
} else {
|
||||
cs.metrics.ValidatorMissedBlocks.With(label...).Add(float64(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
cs.metrics.MissingValidators.Set(float64(missingValidators))
|
||||
cs.metrics.MissingValidatorsPower.Set(float64(missingValidatorsPower))
|
||||
|
||||
cs.metrics.ByzantineValidators.Set(float64(len(block.Evidence.Evidence)))
|
||||
byzantineValidatorsPower := int64(0)
|
||||
for _, ev := range block.Evidence.Evidence {
|
||||
@@ -1492,7 +1509,6 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
|
||||
cs.metrics.BlockSizeBytes.Set(float64(block.Size()))
|
||||
cs.metrics.TotalTxs.Set(float64(block.TotalTxs))
|
||||
cs.metrics.CommittedHeight.Set(float64(block.Height))
|
||||
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
@@ -1617,12 +1633,9 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, err
|
||||
if bytes.Equal(vote.ValidatorAddress, addr) {
|
||||
cs.Logger.Error(
|
||||
"Found conflicting vote from ourselves. Did you unsafe_reset a validator?",
|
||||
"height",
|
||||
vote.Height,
|
||||
"round",
|
||||
vote.Round,
|
||||
"type",
|
||||
vote.Type)
|
||||
"height", vote.Height,
|
||||
"round", vote.Round,
|
||||
"type", vote.Type)
|
||||
return added, err
|
||||
}
|
||||
cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence)
|
||||
@@ -1647,14 +1660,10 @@ func (cs *ConsensusState) addVote(
|
||||
peerID p2p.ID) (added bool, err error) {
|
||||
cs.Logger.Debug(
|
||||
"addVote",
|
||||
"voteHeight",
|
||||
vote.Height,
|
||||
"voteType",
|
||||
vote.Type,
|
||||
"valIndex",
|
||||
vote.ValidatorIndex,
|
||||
"csHeight",
|
||||
cs.Height)
|
||||
"voteHeight", vote.Height,
|
||||
"voteType", vote.Type,
|
||||
"valIndex", vote.ValidatorIndex,
|
||||
"csHeight", cs.Height)
|
||||
|
||||
// A precommit for the previous height?
|
||||
// These come in while we wait timeoutCommit
|
||||
|
||||
@@ -18,34 +18,37 @@ Listen address can be changed in the config file (see
|
||||
|
||||
The following metrics are available:
|
||||
|
||||
| **Name** | **Type** | **Since** | **Tags** | **Description** |
|
||||
| --------------------------------------- | --------- | --------- | -------------- | --------------------------------------------------------------- |
|
||||
| consensus\_height | Gauge | 0.21.0 | | Height of the chain |
|
||||
| consensus\_validators | Gauge | 0.21.0 | | Number of validators |
|
||||
| consensus\_validators\_power | Gauge | 0.21.0 | | Total voting power of all validators |
|
||||
| consensus\_missing\_validators | Gauge | 0.21.0 | | Number of validators who did not sign |
|
||||
| consensus\_missing\_validators\_power | Gauge | 0.21.0 | | Total voting power of the missing validators |
|
||||
| consensus\_byzantine\_validators | Gauge | 0.21.0 | | Number of validators who tried to double sign |
|
||||
| consensus\_byzantine\_validators\_power | Gauge | 0.21.0 | | Total voting power of the byzantine validators |
|
||||
| consensus\_block\_interval\_seconds | Histogram | 0.21.0 | | Time between this and last block (Block.Header.Time) in seconds |
|
||||
| consensus\_rounds | Gauge | 0.21.0 | | Number of rounds |
|
||||
| consensus\_num\_txs | Gauge | 0.21.0 | | Number of transactions |
|
||||
| consensus\_block\_parts | counter | on dev | peer\_id | number of blockparts transmitted by peer |
|
||||
| consensus\_latest\_block\_height | gauge | on dev | | /status sync\_info number |
|
||||
| consensus\_fast\_syncing | gauge | on dev | | either 0 (not fast syncing) or 1 (syncing) |
|
||||
| consensus\_total\_txs | Gauge | 0.21.0 | | Total number of transactions committed |
|
||||
| consensus\_block\_size\_bytes | Gauge | 0.21.0 | | Block size in bytes |
|
||||
| p2p\_peers | Gauge | 0.21.0 | | Number of peers node's connected to |
|
||||
| p2p\_peer\_receive\_bytes\_total | counter | on dev | peer\_id, chID | number of bytes per channel received from a given peer |
|
||||
| p2p\_peer\_send\_bytes\_total | counter | on dev | peer\_id, chID | number of bytes per channel sent to a given peer |
|
||||
| p2p\_peer\_pending\_send\_bytes | gauge | on dev | peer\_id | number of pending bytes to be sent to a given peer |
|
||||
| p2p\_num\_txs | gauge | on dev | peer\_id | number of transactions submitted by each peer\_id |
|
||||
| p2p\_pending\_send\_bytes | gauge | on dev | peer\_id | amount of data pending to be sent to peer |
|
||||
| mempool\_size | Gauge | 0.21.0 | | Number of uncommitted transactions |
|
||||
| mempool\_tx\_size\_bytes | histogram | on dev | | transaction sizes in bytes |
|
||||
| mempool\_failed\_txs | counter | on dev | | number of failed transactions |
|
||||
| mempool\_recheck\_times | counter | on dev | | number of transactions rechecked in the mempool |
|
||||
| state\_block\_processing\_time | histogram | on dev | | time between BeginBlock and EndBlock in ms |
|
||||
| **Name** | **Type** | **Since** | **Tags** | **Description** |
|
||||
| -------------------------------------- | --------- | --------- | ------------- | ---------------------------------------------------------------------- |
|
||||
| consensus_height | Gauge | 0.21.0 | | Height of the chain |
|
||||
| consensus_validators | Gauge | 0.21.0 | | Number of validators |
|
||||
| consensus_validators_power | Gauge | 0.21.0 | | Total voting power of all validators |
|
||||
| consensus_validator_power | Gauge | 0.33.0 | | Voting power of the node if in the validator set |
|
||||
| consensus_validator_last_signed_height | Gauge | 0.33.0 | | Last height the node signed a block, if the node is a validator |
|
||||
| consensus_validator_missed_blocks | Gauge | 0.33.0 | | Total amount of blocks missed for the node, if the node is a validator |
|
||||
| consensus_missing_validators | Gauge | 0.21.0 | | Number of validators who did not sign |
|
||||
| consensus_missing_validators_power | Gauge | 0.21.0 | | Total voting power of the missing validators |
|
||||
| consensus_byzantine_validators | Gauge | 0.21.0 | | Number of validators who tried to double sign |
|
||||
| consensus_byzantine_validators_power | Gauge | 0.21.0 | | Total voting power of the byzantine validators |
|
||||
| consensus_block_interval_seconds | Histogram | 0.21.0 | | Time between this and last block (Block.Header.Time) in seconds |
|
||||
| consensus_rounds | Gauge | 0.21.0 | | Number of rounds |
|
||||
| consensus_num_txs | Gauge | 0.21.0 | | Number of transactions |
|
||||
| consensus_total_txs | Gauge | 0.21.0 | | Total number of transactions committed |
|
||||
| consensus_block_parts | counter | on dev | peer_id | number of blockparts transmitted by peer |
|
||||
| consensus_latest_block_height | gauge | on dev | | /status sync_info number |
|
||||
| consensus_fast_syncing | gauge | on dev | | either 0 (not fast syncing) or 1 (syncing) |
|
||||
| consensus_block_size_bytes | Gauge | 0.21.0 | | Block size in bytes |
|
||||
| p2p_peers | Gauge | 0.21.0 | | Number of peers node's connected to |
|
||||
| p2p_peer_receive_bytes_total | counter | on dev | peer_id, chID | number of bytes per channel received from a given peer |
|
||||
| p2p_peer_send_bytes_total | counter | on dev | peer_id, chID | number of bytes per channel sent to a given peer |
|
||||
| p2p_peer_pending_send_bytes | gauge | on dev | peer_id | number of pending bytes to be sent to a given peer |
|
||||
| p2p_num_txs | gauge | on dev | peer_id | number of transactions submitted by each peer_id |
|
||||
| p2p_pending_send_bytes | gauge | on dev | peer_id | amount of data pending to be sent to peer |
|
||||
| mempool_size | Gauge | 0.21.0 | | Number of uncommitted transactions |
|
||||
| mempool_tx_size_bytes | histogram | on dev | | transaction sizes in bytes |
|
||||
| mempool_failed_txs | counter | on dev | | number of failed transactions |
|
||||
| mempool_recheck_times | counter | on dev | | number of transactions rechecked in the mempool |
|
||||
| state_block_processing_time | histogram | on dev | | time between BeginBlock and EndBlock in ms |
|
||||
|
||||
## Useful queries
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
|
||||
evR := &EvidenceReactor{
|
||||
evpool: evpool,
|
||||
}
|
||||
evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
|
||||
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
|
||||
return evR
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
)
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ type mempoolIDs struct {
|
||||
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
|
||||
}
|
||||
|
||||
// Reserve searches for the next unused ID and assignes it to the
|
||||
// Reserve searches for the next unused ID and assigns it to the
|
||||
// peer.
|
||||
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
|
||||
ids.mtx.Lock()
|
||||
@@ -110,10 +110,16 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
|
||||
mempool: mempool,
|
||||
ids: newMempoolIDs(),
|
||||
}
|
||||
memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR)
|
||||
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
|
||||
return memR
|
||||
}
|
||||
|
||||
// InitPeer implements Reactor by creating a state for the peer.
|
||||
func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
|
||||
memR.ids.ReserveForPeer(peer)
|
||||
return peer
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger on the reactor and the underlying mempool.
|
||||
func (memR *Reactor) SetLogger(l log.Logger) {
|
||||
memR.Logger = l
|
||||
@@ -142,7 +148,6 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
// AddPeer implements Reactor.
|
||||
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
|
||||
func (memR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
memR.ids.ReserveForPeer(peer)
|
||||
go memR.broadcastTxRoutine(peer)
|
||||
}
|
||||
|
||||
|
||||
@@ -223,3 +223,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
|
||||
ids.ReserveForPeer(peer)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
||||
config := cfg.TestConfig()
|
||||
const N = 1
|
||||
reactors := makeAndConnectReactors(config, N)
|
||||
defer func() {
|
||||
for _, r := range reactors {
|
||||
r.Stop()
|
||||
}
|
||||
}()
|
||||
reactor := reactors[0]
|
||||
|
||||
for i := 0; i < maxActiveIDs+1; i++ {
|
||||
peer := mock.NewPeer(nil)
|
||||
reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
|
||||
reactor.AddPeer(peer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -467,6 +467,11 @@ func createTransport(
|
||||
}
|
||||
|
||||
p2p.MultiplexTransportConnFilters(connFilters...)(transport)
|
||||
|
||||
// Limit the number of incoming connections.
|
||||
max := config.P2P.MaxNumInboundPeers
|
||||
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)
|
||||
|
||||
return transport, peerFilters
|
||||
}
|
||||
|
||||
|
||||
@@ -130,7 +130,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
|
||||
lastReceivedRequests: cmn.NewCMap(),
|
||||
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
|
||||
}
|
||||
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
|
||||
r.BaseReactor = *p2p.NewBaseReactor("PEX", r)
|
||||
return r
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/netutil"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
@@ -122,11 +123,18 @@ func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption {
|
||||
return func(mt *MultiplexTransport) { mt.resolver = resolver }
|
||||
}
|
||||
|
||||
// MultiplexTransportMaxIncomingConnections sets the maximum number of
|
||||
// simultaneous connections (incoming). Default: 0 (unlimited)
|
||||
func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
|
||||
return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n }
|
||||
}
|
||||
|
||||
// MultiplexTransport accepts and dials tcp connections and upgrades them to
|
||||
// multiplexed peers.
|
||||
type MultiplexTransport struct {
|
||||
netAddr NetAddress
|
||||
listener net.Listener
|
||||
netAddr NetAddress
|
||||
listener net.Listener
|
||||
maxIncomingConnections int // see MaxIncomingConnections
|
||||
|
||||
acceptc chan accept
|
||||
closec chan struct{}
|
||||
@@ -240,6 +248,10 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if mt.maxIncomingConnections > 0 {
|
||||
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
|
||||
}
|
||||
|
||||
mt.netAddr = addr
|
||||
mt.listener = ln
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"math/rand"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -134,6 +135,50 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
|
||||
mt := newMultiplexTransport(
|
||||
emptyNodeInfo(),
|
||||
NodeKey{
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
},
|
||||
)
|
||||
id := mt.nodeKey.ID()
|
||||
|
||||
MultiplexTransportMaxIncomingConnections(0)(mt)
|
||||
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := mt.Listen(*addr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
errc := make(chan error)
|
||||
|
||||
go func() {
|
||||
addr := NewNetAddress(id, mt.listener.Addr())
|
||||
|
||||
_, err := addr.Dial()
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
|
||||
close(errc)
|
||||
}()
|
||||
|
||||
if err := <-errc; err != nil {
|
||||
t.Errorf("connection failed: %v", err)
|
||||
}
|
||||
|
||||
_, err = mt.Accept(peerConfig{})
|
||||
if err == nil || !strings.Contains(err.Error(), "connection reset by peer") {
|
||||
t.Errorf("expected connection reset by peer error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransportMultiplexAcceptMultiple(t *testing.T) {
|
||||
mt := testSetupMultiplexTransport(t)
|
||||
laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
|
||||
@@ -28,6 +28,55 @@ const (
|
||||
protoTCP = "tcp"
|
||||
)
|
||||
|
||||
// Parsed URL structure
|
||||
type parsedURL struct {
|
||||
url.URL
|
||||
}
|
||||
|
||||
// Parse URL and set defaults
|
||||
func newParsedURL(remoteAddr string) (*parsedURL, error) {
|
||||
u, err := url.Parse(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// default to tcp if nothing specified
|
||||
if u.Scheme == "" {
|
||||
u.Scheme = protoTCP
|
||||
}
|
||||
|
||||
return &parsedURL{*u}, nil
|
||||
}
|
||||
|
||||
// Change protocol to HTTP for unknown protocols and TCP protocol - useful for RPC connections
|
||||
func (u *parsedURL) SetDefaultSchemeHTTP() {
|
||||
// protocol to use for http operations, to support both http and https
|
||||
switch u.Scheme {
|
||||
case protoHTTP, protoHTTPS, protoWS, protoWSS:
|
||||
// known protocols not changed
|
||||
default:
|
||||
// default to http for unknown protocols (ex. tcp)
|
||||
u.Scheme = protoHTTP
|
||||
}
|
||||
}
|
||||
|
||||
// Get full address without the protocol - useful for Dialer connections
|
||||
func (u parsedURL) GetHostWithPath() string {
|
||||
// Remove protocol, userinfo and # fragment, assume opaque is empty
|
||||
return u.Host + u.EscapedPath()
|
||||
}
|
||||
|
||||
// Get a trimmed address - useful for WS connections
|
||||
func (u parsedURL) GetTrimmedHostWithPath() string {
|
||||
// replace / with . for http requests (kvstore domain)
|
||||
return strings.Replace(u.GetHostWithPath(), "/", ".", -1)
|
||||
}
|
||||
|
||||
// Get a trimmed address with protocol - useful as address in RPC connections
|
||||
func (u parsedURL) GetTrimmedURL() string {
|
||||
return u.Scheme + "://" + u.GetTrimmedHostWithPath()
|
||||
}
|
||||
|
||||
// HTTPClient is a common interface for JSONRPCClient and URIClient.
|
||||
type HTTPClient interface {
|
||||
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
|
||||
@@ -35,62 +84,6 @@ type HTTPClient interface {
|
||||
SetCodec(*amino.Codec)
|
||||
}
|
||||
|
||||
// protocol - client's protocol (for example, "http", "https", "wss", "ws", "tcp")
|
||||
// trimmedS - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") with "/" replaced with "."
|
||||
func toClientAddrAndParse(remoteAddr string) (network string, trimmedS string, err error) {
|
||||
protocol, address, err := parseRemoteAddr(remoteAddr)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
// protocol to use for http operations, to support both http and https
|
||||
var clientProtocol string
|
||||
// default to http for unknown protocols (ex. tcp)
|
||||
switch protocol {
|
||||
case protoHTTP, protoHTTPS, protoWS, protoWSS:
|
||||
clientProtocol = protocol
|
||||
default:
|
||||
clientProtocol = protoHTTP
|
||||
}
|
||||
|
||||
// replace / with . for http requests (kvstore domain)
|
||||
trimmedAddress := strings.Replace(address, "/", ".", -1)
|
||||
return clientProtocol, trimmedAddress, nil
|
||||
}
|
||||
|
||||
func toClientAddress(remoteAddr string) (string, error) {
|
||||
clientProtocol, trimmedAddress, err := toClientAddrAndParse(remoteAddr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return clientProtocol + "://" + trimmedAddress, nil
|
||||
}
|
||||
|
||||
// network - name of the network (for example, "tcp", "unix")
|
||||
// s - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80")
|
||||
// TODO: Deprecate support for IP:PORT or /path/to/socket
|
||||
func parseRemoteAddr(remoteAddr string) (network string, s string, err error) {
|
||||
parts := strings.SplitN(remoteAddr, "://", 2)
|
||||
var protocol, address string
|
||||
switch {
|
||||
case len(parts) == 1:
|
||||
// default to tcp if nothing specified
|
||||
protocol, address = protoTCP, remoteAddr
|
||||
case len(parts) == 2:
|
||||
protocol, address = parts[0], parts[1]
|
||||
default:
|
||||
return "", "", fmt.Errorf("invalid addr: %s", remoteAddr)
|
||||
}
|
||||
|
||||
// accept http(s) as an alias for tcp
|
||||
switch protocol {
|
||||
case protoHTTP, protoHTTPS:
|
||||
protocol = protoTCP
|
||||
}
|
||||
|
||||
return protocol, address, nil
|
||||
}
|
||||
|
||||
func makeErrorDialer(err error) func(string, string) (net.Conn, error) {
|
||||
return func(_ string, _ string) (net.Conn, error) {
|
||||
return nil, err
|
||||
@@ -98,13 +91,21 @@ func makeErrorDialer(err error) func(string, string) (net.Conn, error) {
|
||||
}
|
||||
|
||||
func makeHTTPDialer(remoteAddr string) func(string, string) (net.Conn, error) {
|
||||
protocol, address, err := parseRemoteAddr(remoteAddr)
|
||||
u, err := newParsedURL(remoteAddr)
|
||||
if err != nil {
|
||||
return makeErrorDialer(err)
|
||||
}
|
||||
|
||||
protocol := u.Scheme
|
||||
|
||||
// accept http(s) as an alias for tcp
|
||||
switch protocol {
|
||||
case protoHTTP, protoHTTPS:
|
||||
protocol = protoTCP
|
||||
}
|
||||
|
||||
return func(proto, addr string) (net.Conn, error) {
|
||||
return net.Dial(protocol, address)
|
||||
return net.Dial(protocol, u.GetHostWithPath())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,10 +143,12 @@ type JSONRPCRequestBatch struct {
|
||||
|
||||
// JSONRPCClient takes params as a slice
|
||||
type JSONRPCClient struct {
|
||||
address string
|
||||
client *http.Client
|
||||
id types.JSONRPCStringID
|
||||
cdc *amino.Codec
|
||||
address string
|
||||
username string
|
||||
password string
|
||||
client *http.Client
|
||||
id types.JSONRPCStringID
|
||||
cdc *amino.Codec
|
||||
}
|
||||
|
||||
// JSONRPCCaller implementers can facilitate calling the JSON RPC endpoint.
|
||||
@@ -170,16 +173,24 @@ func NewJSONRPCClientWithHTTPClient(remote string, client *http.Client) *JSONRPC
|
||||
panic("nil http.Client provided")
|
||||
}
|
||||
|
||||
clientAddress, err := toClientAddress(remote)
|
||||
parsedURL, err := newParsedURL(remote)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid remote %s: %s", remote, err))
|
||||
}
|
||||
|
||||
parsedURL.SetDefaultSchemeHTTP()
|
||||
|
||||
address := parsedURL.GetTrimmedURL()
|
||||
username := parsedURL.User.Username()
|
||||
password, _ := parsedURL.User.Password()
|
||||
|
||||
return &JSONRPCClient{
|
||||
address: clientAddress,
|
||||
client: client,
|
||||
id: types.JSONRPCStringID("jsonrpc-client-" + cmn.RandStr(8)),
|
||||
cdc: amino.NewCodec(),
|
||||
address: address,
|
||||
username: username,
|
||||
password: password,
|
||||
client: client,
|
||||
id: types.JSONRPCStringID("jsonrpc-client-" + cmn.RandStr(8)),
|
||||
cdc: amino.NewCodec(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,7 +206,15 @@ func (c *JSONRPCClient) Call(method string, params map[string]interface{}, resul
|
||||
return nil, err
|
||||
}
|
||||
requestBuf := bytes.NewBuffer(requestBytes)
|
||||
httpResponse, err := c.client.Post(c.address, "text/json", requestBuf)
|
||||
httpRequest, err := http.NewRequest(http.MethodPost, c.address, requestBuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpRequest.Header.Set("Content-Type", "text/json")
|
||||
if c.username != "" || c.password != "" {
|
||||
httpRequest.SetBasicAuth(c.username, c.password)
|
||||
}
|
||||
httpResponse, err := c.client.Do(httpRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -228,7 +247,15 @@ func (c *JSONRPCClient) sendBatch(requests []*jsonRPCBufferedRequest) ([]interfa
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpResponse, err := c.client.Post(c.address, "text/json", bytes.NewBuffer(requestBytes))
|
||||
httpRequest, err := http.NewRequest(http.MethodPost, c.address, bytes.NewBuffer(requestBytes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpRequest.Header.Set("Content-Type", "text/json")
|
||||
if c.username != "" || c.password != "" {
|
||||
httpRequest.SetBasicAuth(c.username, c.password)
|
||||
}
|
||||
httpResponse, err := c.client.Do(httpRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -315,12 +342,15 @@ type URIClient struct {
|
||||
|
||||
// The function panics if the provided remote is invalid.
|
||||
func NewURIClient(remote string) *URIClient {
|
||||
clientAddress, err := toClientAddress(remote)
|
||||
parsedURL, err := newParsedURL(remote)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid remote %s: %s", remote, err))
|
||||
}
|
||||
|
||||
parsedURL.SetDefaultSchemeHTTP()
|
||||
|
||||
return &URIClient{
|
||||
address: clientAddress,
|
||||
address: parsedURL.GetTrimmedURL(),
|
||||
client: DefaultHTTPClient(remote),
|
||||
cdc: amino.NewCodec(),
|
||||
}
|
||||
|
||||
22
rpc/lib/client/http_client_test.go
Normal file
22
rpc/lib/client/http_client_test.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestHTTPClientMakeHTTPDialer(t *testing.T) {
|
||||
remote := []string{"https://foo-bar.com:80", "http://foo-bar.net:80", "https://user:pass@foo-bar.net:80"}
|
||||
|
||||
for _, f := range remote {
|
||||
u, err := newParsedURL(f)
|
||||
require.NoError(t, err)
|
||||
dialFn := makeHTTPDialer(f)
|
||||
|
||||
addr, err := dialFn(u.Scheme, u.GetHostWithPath())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, addr)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -27,7 +27,7 @@ const (
|
||||
|
||||
// WSClient is a WebSocket client. The methods of WSClient are safe for use by
|
||||
// multiple goroutines.
|
||||
type WSClient struct { // nolint: maligned
|
||||
type WSClient struct {
|
||||
conn *websocket.Conn
|
||||
cdc *amino.Codec
|
||||
|
||||
@@ -80,18 +80,18 @@ type WSClient struct { // nolint: maligned
|
||||
// pong wait time. The endpoint argument must begin with a `/`.
|
||||
// The function panics if the provided address is invalid.
|
||||
func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
|
||||
protocol, addr, err := toClientAddrAndParse(remoteAddr)
|
||||
parsedURL, err := newParsedURL(remoteAddr)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid remote %s: %s", remoteAddr, err))
|
||||
}
|
||||
// default to ws protocol, unless wss is explicitly specified
|
||||
if protocol != "wss" {
|
||||
protocol = "ws"
|
||||
if parsedURL.Scheme != protoWSS {
|
||||
parsedURL.Scheme = protoWS
|
||||
}
|
||||
|
||||
c := &WSClient{
|
||||
cdc: amino.NewCodec(),
|
||||
Address: addr,
|
||||
Address: parsedURL.GetTrimmedHostWithPath(),
|
||||
Dialer: makeHTTPDialer(remoteAddr),
|
||||
Endpoint: endpoint,
|
||||
PingPongLatencyTimer: metrics.NewTimer(),
|
||||
@@ -100,7 +100,7 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli
|
||||
readWait: defaultReadWait,
|
||||
writeWait: defaultWriteWait,
|
||||
pingPeriod: defaultPingPeriod,
|
||||
protocol: protocol,
|
||||
protocol: parsedURL.Scheme,
|
||||
}
|
||||
c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
|
||||
for _, option := range options {
|
||||
|
||||
@@ -64,7 +64,8 @@ func TestWSClientReconnectsAfterReadFailure(t *testing.T) {
|
||||
s := httptest.NewServer(h)
|
||||
defer s.Close()
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
// https://github.com/golang/go/issues/19297#issuecomment-282651469
|
||||
c := startClient(t, "//" + s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
wg.Add(1)
|
||||
@@ -96,7 +97,8 @@ func TestWSClientReconnectsAfterWriteFailure(t *testing.T) {
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
// https://github.com/golang/go/issues/19297#issuecomment-282651469
|
||||
c := startClient(t, "//" + s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
wg.Add(2)
|
||||
@@ -124,7 +126,8 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
// https://github.com/golang/go/issues/19297#issuecomment-282651469
|
||||
c := startClient(t, "//" + s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
go func() {
|
||||
@@ -173,7 +176,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
||||
func TestNotBlockingOnStop(t *testing.T) {
|
||||
timeout := 2 * time.Second
|
||||
s := httptest.NewServer(&myHandler{})
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
c := startClient(t, "//" + s.Listener.Addr().String())
|
||||
c.Call(context.Background(), "a", make(map[string]interface{}))
|
||||
// Let the readRoutine get around to blocking
|
||||
time.Sleep(time.Second)
|
||||
|
||||
@@ -20,7 +20,8 @@ const (
|
||||
// Must be a string because scripts like dist.sh read this file.
|
||||
// XXX: Don't change the name of this variable or you will break
|
||||
// automation :)
|
||||
TMCoreSemVer = "0.32.7"
|
||||
|
||||
TMCoreSemVer = "0.32.10"
|
||||
|
||||
// ABCISemVer is the semantic version of the ABCI library
|
||||
ABCISemVer = "0.16.1"
|
||||
|
||||
Reference in New Issue
Block a user