mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-11 23:32:50 +00:00
Compare commits
67 Commits
wb/undo-qu
...
release/0.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8c6745e47 | ||
|
|
969b1b1548 | ||
|
|
832babe790 | ||
|
|
00b0b87476 | ||
|
|
d85e2e52d2 | ||
|
|
eab4d6d82b | ||
|
|
cf114c98d4 | ||
|
|
49d8c0b174 | ||
|
|
63c9384c8c | ||
|
|
0a1ef4aa0f | ||
|
|
ffb0278d95 | ||
|
|
e6a7757bb4 | ||
|
|
51ccaf6fb8 | ||
|
|
be771f225a | ||
|
|
aad59f2a9a | ||
|
|
d494952c82 | ||
|
|
14e04f7606 | ||
|
|
83f1801625 | ||
|
|
95530285d9 | ||
|
|
7b67ee408b | ||
|
|
7ffd3fff43 | ||
|
|
c013501f45 | ||
|
|
7ec2dff6fd | ||
|
|
b5cad43b26 | ||
|
|
714948505b | ||
|
|
564d6a203a | ||
|
|
c38dbdb640 | ||
|
|
c207fa6eff | ||
|
|
470a23f9b4 | ||
|
|
c4ba93a1e6 | ||
|
|
88946fd6d8 | ||
|
|
ab62fd977f | ||
|
|
0f111b3c5c | ||
|
|
b08f655024 | ||
|
|
d6ea1ed96f | ||
|
|
d06286916d | ||
|
|
0354ea87f7 | ||
|
|
ff2308b5f4 | ||
|
|
0ad70fb69a | ||
|
|
86a581f28f | ||
|
|
76f3db06b8 | ||
|
|
15878dc80c | ||
|
|
aacc71dc29 | ||
|
|
d56fb6ed22 | ||
|
|
8025d402e2 | ||
|
|
513a32a6e3 | ||
|
|
5c9d6d839e | ||
|
|
53fdcfd7e9 | ||
|
|
5f6617db7a | ||
|
|
6d4f18aa8c | ||
|
|
55066ceaad | ||
|
|
58c3e590b4 | ||
|
|
ff9e08a32f | ||
|
|
0335add437 | ||
|
|
5d0e7034e8 | ||
|
|
abc30821f4 | ||
|
|
e89991c445 | ||
|
|
df6df61ea9 | ||
|
|
5ed39fd0b3 | ||
|
|
17b69d4d56 | ||
|
|
5d7e22a53c | ||
|
|
073cd1125e | ||
|
|
7041001fb6 | ||
|
|
c264db339e | ||
|
|
8da43508f8 | ||
|
|
9867a65de7 | ||
|
|
9df117748e |
929
CHANGELOG.md
929
CHANGELOG.md
File diff suppressed because it is too large
Load Diff
@@ -1,9 +1,8 @@
|
||||
## v0.32.8
|
||||
## v0.32.12
|
||||
|
||||
\*\*
|
||||
|
||||
Special thanks to external contributors on this release:
|
||||
@erikgrinaker
|
||||
|
||||
Friendly reminder, we have a [bug bounty
|
||||
program](https://hackerone.com/tendermint).
|
||||
@@ -15,24 +14,10 @@ program](https://hackerone.com/tendermint).
|
||||
- Apps
|
||||
|
||||
- Go API
|
||||
- [libs/pubsub] [\#4070](https://github.com/tendermint/tendermint/pull/4070) `Query#(Matches|Conditions)` returns an error.
|
||||
|
||||
### FEATURES:
|
||||
|
||||
### IMPROVEMENTS:
|
||||
|
||||
- [mempool] [\#4083](https://github.com/tendermint/tendermint/pull/4083) Added TxInfo parameter to CheckTx(), and removed CheckTxWithInfo() (@erikgrinaker)
|
||||
- [mempool] [\#4057](https://github.com/tendermint/tendermint/issues/4057) Include peer ID when logging rejected txns (@erikgrinaker)
|
||||
- [tools] [\#4023](https://github.com/tendermint/tendermint/issues/4023) Improved `tm-monitor` formatting of start time and avg tx throughput (@erikgrinaker)
|
||||
- [libs/pubsub] [\#4070](https://github.com/tendermint/tendermint/pull/4070) No longer panic in `Query#(Matches|Conditions)` preferring to return an error instead.
|
||||
- [libs/pubsub] [\#4070](https://github.com/tendermint/tendermint/pull/4070) Strip out non-numeric characters when attempting to match numeric values.
|
||||
- [p2p] [\#3991](https://github.com/tendermint/tendermint/issues/3991) Log "has been established or dialed" as debug log instead of Error for connected peers (@whunmr)
|
||||
- [rpc] [\#4077](https://github.com/tendermint/tendermint/pull/4077) Added support for `EXISTS` clause to the Websocket query interface.
|
||||
- [privval] Add `SignerDialerEndpointRetryWaitInterval` option (@cosmostuba)
|
||||
- [crypto] Add `RegisterKeyType` to amino to allow external key types registration (@austinabell)
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [tools] [\#4023](https://github.com/tendermint/tendermint/issues/4023) Refresh `tm-monitor` health when validator count is updated (@erikgrinaker)
|
||||
- [state] [\#4104](https://github.com/tendermint/tendermint/pull/4104) txindex/kv: Fsync data to disk immediately after receiving it (@guagualvcha)
|
||||
- [state] [\#4095](https://github.com/tendermint/tendermint/pull/4095) txindex/kv: Return an error if there's one when the user searches for a tx (hash=X) (@hsyis)
|
||||
|
||||
@@ -53,7 +53,7 @@ func testCounter() {
|
||||
}
|
||||
|
||||
fmt.Printf("Running %s test with abci=%s\n", abciApp, abciType)
|
||||
cmd := exec.Command("bash", "-c", fmt.Sprintf("abci-cli %s", abciApp))
|
||||
cmd := exec.Command("bash", "-c", fmt.Sprintf("abci-cli %s", abciApp)) //nolint:gosec
|
||||
cmd.Stdout = os.Stdout
|
||||
if err := cmd.Start(); err != nil {
|
||||
log.Fatalf("starting %q err: %v", abciApp, err)
|
||||
|
||||
@@ -19,6 +19,9 @@ type Metrics struct {
|
||||
// Height of the chain.
|
||||
Height metrics.Gauge
|
||||
|
||||
// ValidatorLastSignedHeight of a validator.
|
||||
ValidatorLastSignedHeight metrics.Gauge
|
||||
|
||||
// Number of rounds.
|
||||
Rounds metrics.Gauge
|
||||
|
||||
@@ -26,6 +29,10 @@ type Metrics struct {
|
||||
Validators metrics.Gauge
|
||||
// Total power of all validators.
|
||||
ValidatorsPower metrics.Gauge
|
||||
// Power of a validator.
|
||||
ValidatorPower metrics.Gauge
|
||||
// Amount of blocks missed by a validator.
|
||||
ValidatorMissedBlocks metrics.Gauge
|
||||
// Number of validators who did not sign.
|
||||
MissingValidators metrics.Gauge
|
||||
// Total power of the missing validators.
|
||||
@@ -81,12 +88,30 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
Name: "validators",
|
||||
Help: "Number of validators.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ValidatorLastSignedHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_last_signed_height",
|
||||
Help: "Last signed height for a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
ValidatorMissedBlocks: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_missed_blocks",
|
||||
Help: "Total missed blocks for a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
ValidatorsPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validators_power",
|
||||
Help: "Total power of all validators.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ValidatorPower: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "validator_power",
|
||||
Help: "Power of a validator",
|
||||
}, append(labels, "validator_address")).With(labelsAndValues...),
|
||||
MissingValidators: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
@@ -163,10 +188,14 @@ func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
Height: discard.NewGauge(),
|
||||
|
||||
ValidatorLastSignedHeight: discard.NewGauge(),
|
||||
|
||||
Rounds: discard.NewGauge(),
|
||||
|
||||
Validators: discard.NewGauge(),
|
||||
ValidatorsPower: discard.NewGauge(),
|
||||
ValidatorPower: discard.NewGauge(),
|
||||
ValidatorMissedBlocks: discard.NewGauge(),
|
||||
MissingValidators: discard.NewGauge(),
|
||||
MissingValidatorsPower: discard.NewGauge(),
|
||||
ByzantineValidators: discard.NewGauge(),
|
||||
|
||||
@@ -57,7 +57,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options
|
||||
metrics: NopMetrics(),
|
||||
}
|
||||
conR.updateFastSyncingMetric()
|
||||
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
|
||||
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
|
||||
|
||||
for _, option := range options {
|
||||
option(conR)
|
||||
@@ -501,10 +501,12 @@ OUTER_LOOP:
|
||||
if prs.ProposalBlockParts == nil {
|
||||
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
|
||||
if blockMeta == nil {
|
||||
panic(fmt.Sprintf("Failed to load block %d when blockStore is at %d",
|
||||
prs.Height, conR.conS.blockStore.Height()))
|
||||
heightLogger.Error("Failed to load block meta",
|
||||
"blockstoreHeight", conR.conS.blockStore.Height())
|
||||
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
|
||||
} else {
|
||||
ps.InitProposalBlockParts(blockMeta.BlockID.PartsHeader)
|
||||
}
|
||||
ps.InitProposalBlockParts(blockMeta.BlockID.PartsHeader)
|
||||
// continue the loop since prs is a copy and not effected by this initialization
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
||||
@@ -888,7 +888,7 @@ func TestVoteSetMaj23MessageValidateBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVoteSetBitsMessageValidateBasic(t *testing.T) {
|
||||
testCases := []struct { // nolint: maligned
|
||||
testCases := []struct {
|
||||
malleateFn func(*VoteSetBitsMessage)
|
||||
expErr string
|
||||
}{
|
||||
|
||||
@@ -1411,7 +1411,8 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
|
||||
// restart).
|
||||
endMsg := EndHeightMessage{height}
|
||||
if err := cs.wal.WriteSync(endMsg); err != nil { // NOTE: fsync
|
||||
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node", endMsg, err))
|
||||
panic(fmt.Sprintf("Failed to write %v msg to consensus wal due to %v. Check your FS and restart the node",
|
||||
endMsg, err))
|
||||
}
|
||||
|
||||
fail.Fail() // XXX
|
||||
@@ -1458,8 +1459,11 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
|
||||
func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
|
||||
cs.metrics.Validators.Set(float64(cs.Validators.Size()))
|
||||
cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower()))
|
||||
missingValidators := 0
|
||||
missingValidatorsPower := int64(0)
|
||||
|
||||
var (
|
||||
missingValidators = 0
|
||||
missingValidatorsPower int64
|
||||
)
|
||||
for i, val := range cs.Validators.Validators {
|
||||
var vote *types.CommitSig
|
||||
if i < len(block.LastCommit.Precommits) {
|
||||
@@ -1469,9 +1473,22 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
|
||||
missingValidators++
|
||||
missingValidatorsPower += val.VotingPower
|
||||
}
|
||||
|
||||
if cs.privValidator != nil && bytes.Equal(val.Address, cs.privValidator.GetPubKey().Address()) {
|
||||
label := []string{
|
||||
"validator_address", val.Address.String(),
|
||||
}
|
||||
cs.metrics.ValidatorPower.With(label...).Set(float64(val.VotingPower))
|
||||
if vote != nil {
|
||||
cs.metrics.ValidatorLastSignedHeight.With(label...).Set(float64(height))
|
||||
} else {
|
||||
cs.metrics.ValidatorMissedBlocks.With(label...).Add(float64(1))
|
||||
}
|
||||
}
|
||||
}
|
||||
cs.metrics.MissingValidators.Set(float64(missingValidators))
|
||||
cs.metrics.MissingValidatorsPower.Set(float64(missingValidatorsPower))
|
||||
|
||||
cs.metrics.ByzantineValidators.Set(float64(len(block.Evidence.Evidence)))
|
||||
byzantineValidatorsPower := int64(0)
|
||||
for _, ev := range block.Evidence.Evidence {
|
||||
@@ -1492,7 +1509,6 @@ func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
|
||||
cs.metrics.BlockSizeBytes.Set(float64(block.Size()))
|
||||
cs.metrics.TotalTxs.Set(float64(block.TotalTxs))
|
||||
cs.metrics.CommittedHeight.Set(float64(block.Height))
|
||||
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
@@ -1617,12 +1633,9 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, err
|
||||
if bytes.Equal(vote.ValidatorAddress, addr) {
|
||||
cs.Logger.Error(
|
||||
"Found conflicting vote from ourselves. Did you unsafe_reset a validator?",
|
||||
"height",
|
||||
vote.Height,
|
||||
"round",
|
||||
vote.Round,
|
||||
"type",
|
||||
vote.Type)
|
||||
"height", vote.Height,
|
||||
"round", vote.Round,
|
||||
"type", vote.Type)
|
||||
return added, err
|
||||
}
|
||||
cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence)
|
||||
@@ -1647,14 +1660,10 @@ func (cs *ConsensusState) addVote(
|
||||
peerID p2p.ID) (added bool, err error) {
|
||||
cs.Logger.Debug(
|
||||
"addVote",
|
||||
"voteHeight",
|
||||
vote.Height,
|
||||
"voteType",
|
||||
vote.Type,
|
||||
"valIndex",
|
||||
vote.ValidatorIndex,
|
||||
"csHeight",
|
||||
cs.Height)
|
||||
"voteHeight", vote.Height,
|
||||
"voteType", vote.Type,
|
||||
"valIndex", vote.ValidatorIndex,
|
||||
"csHeight", cs.Height)
|
||||
|
||||
// A precommit for the previous height?
|
||||
// These come in while we wait timeoutCommit
|
||||
|
||||
@@ -18,34 +18,37 @@ Listen address can be changed in the config file (see
|
||||
|
||||
The following metrics are available:
|
||||
|
||||
| **Name** | **Type** | **Since** | **Tags** | **Description** |
|
||||
| --------------------------------------- | --------- | --------- | -------------- | --------------------------------------------------------------- |
|
||||
| consensus\_height | Gauge | 0.21.0 | | Height of the chain |
|
||||
| consensus\_validators | Gauge | 0.21.0 | | Number of validators |
|
||||
| consensus\_validators\_power | Gauge | 0.21.0 | | Total voting power of all validators |
|
||||
| consensus\_missing\_validators | Gauge | 0.21.0 | | Number of validators who did not sign |
|
||||
| consensus\_missing\_validators\_power | Gauge | 0.21.0 | | Total voting power of the missing validators |
|
||||
| consensus\_byzantine\_validators | Gauge | 0.21.0 | | Number of validators who tried to double sign |
|
||||
| consensus\_byzantine\_validators\_power | Gauge | 0.21.0 | | Total voting power of the byzantine validators |
|
||||
| consensus\_block\_interval\_seconds | Histogram | 0.21.0 | | Time between this and last block (Block.Header.Time) in seconds |
|
||||
| consensus\_rounds | Gauge | 0.21.0 | | Number of rounds |
|
||||
| consensus\_num\_txs | Gauge | 0.21.0 | | Number of transactions |
|
||||
| consensus\_block\_parts | counter | on dev | peer\_id | number of blockparts transmitted by peer |
|
||||
| consensus\_latest\_block\_height | gauge | on dev | | /status sync\_info number |
|
||||
| consensus\_fast\_syncing | gauge | on dev | | either 0 (not fast syncing) or 1 (syncing) |
|
||||
| consensus\_total\_txs | Gauge | 0.21.0 | | Total number of transactions committed |
|
||||
| consensus\_block\_size\_bytes | Gauge | 0.21.0 | | Block size in bytes |
|
||||
| p2p\_peers | Gauge | 0.21.0 | | Number of peers node's connected to |
|
||||
| p2p\_peer\_receive\_bytes\_total | counter | on dev | peer\_id, chID | number of bytes per channel received from a given peer |
|
||||
| p2p\_peer\_send\_bytes\_total | counter | on dev | peer\_id, chID | number of bytes per channel sent to a given peer |
|
||||
| p2p\_peer\_pending\_send\_bytes | gauge | on dev | peer\_id | number of pending bytes to be sent to a given peer |
|
||||
| p2p\_num\_txs | gauge | on dev | peer\_id | number of transactions submitted by each peer\_id |
|
||||
| p2p\_pending\_send\_bytes | gauge | on dev | peer\_id | amount of data pending to be sent to peer |
|
||||
| mempool\_size | Gauge | 0.21.0 | | Number of uncommitted transactions |
|
||||
| mempool\_tx\_size\_bytes | histogram | on dev | | transaction sizes in bytes |
|
||||
| mempool\_failed\_txs | counter | on dev | | number of failed transactions |
|
||||
| mempool\_recheck\_times | counter | on dev | | number of transactions rechecked in the mempool |
|
||||
| state\_block\_processing\_time | histogram | on dev | | time between BeginBlock and EndBlock in ms |
|
||||
| **Name** | **Type** | **Since** | **Tags** | **Description** |
|
||||
| -------------------------------------- | --------- | --------- | ------------- | ---------------------------------------------------------------------- |
|
||||
| consensus_height | Gauge | 0.21.0 | | Height of the chain |
|
||||
| consensus_validators | Gauge | 0.21.0 | | Number of validators |
|
||||
| consensus_validators_power | Gauge | 0.21.0 | | Total voting power of all validators |
|
||||
| consensus_validator_power | Gauge | 0.33.0 | | Voting power of the node if in the validator set |
|
||||
| consensus_validator_last_signed_height | Gauge | 0.33.0 | | Last height the node signed a block, if the node is a validator |
|
||||
| consensus_validator_missed_blocks | Gauge | 0.33.0 | | Total amount of blocks missed for the node, if the node is a validator |
|
||||
| consensus_missing_validators | Gauge | 0.21.0 | | Number of validators who did not sign |
|
||||
| consensus_missing_validators_power | Gauge | 0.21.0 | | Total voting power of the missing validators |
|
||||
| consensus_byzantine_validators | Gauge | 0.21.0 | | Number of validators who tried to double sign |
|
||||
| consensus_byzantine_validators_power | Gauge | 0.21.0 | | Total voting power of the byzantine validators |
|
||||
| consensus_block_interval_seconds | Histogram | 0.21.0 | | Time between this and last block (Block.Header.Time) in seconds |
|
||||
| consensus_rounds | Gauge | 0.21.0 | | Number of rounds |
|
||||
| consensus_num_txs | Gauge | 0.21.0 | | Number of transactions |
|
||||
| consensus_total_txs | Gauge | 0.21.0 | | Total number of transactions committed |
|
||||
| consensus_block_parts | counter | on dev | peer_id | number of blockparts transmitted by peer |
|
||||
| consensus_latest_block_height | gauge | on dev | | /status sync_info number |
|
||||
| consensus_fast_syncing | gauge | on dev | | either 0 (not fast syncing) or 1 (syncing) |
|
||||
| consensus_block_size_bytes | Gauge | 0.21.0 | | Block size in bytes |
|
||||
| p2p_peers | Gauge | 0.21.0 | | Number of peers node's connected to |
|
||||
| p2p_peer_receive_bytes_total | counter | on dev | peer_id, chID | number of bytes per channel received from a given peer |
|
||||
| p2p_peer_send_bytes_total | counter | on dev | peer_id, chID | number of bytes per channel sent to a given peer |
|
||||
| p2p_peer_pending_send_bytes | gauge | on dev | peer_id | number of pending bytes to be sent to a given peer |
|
||||
| p2p_num_txs | gauge | on dev | peer_id | number of transactions submitted by each peer_id |
|
||||
| p2p_pending_send_bytes | gauge | on dev | peer_id | amount of data pending to be sent to peer |
|
||||
| mempool_size | Gauge | 0.21.0 | | Number of uncommitted transactions |
|
||||
| mempool_tx_size_bytes | histogram | on dev | | transaction sizes in bytes |
|
||||
| mempool_failed_txs | counter | on dev | | number of failed transactions |
|
||||
| mempool_recheck_times | counter | on dev | | number of transactions rechecked in the mempool |
|
||||
| state_block_processing_time | histogram | on dev | | time between BeginBlock and EndBlock in ms |
|
||||
|
||||
## Useful queries
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
|
||||
evR := &EvidenceReactor{
|
||||
evpool: evpool,
|
||||
}
|
||||
evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
|
||||
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
|
||||
return evR
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
)
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ type mempoolIDs struct {
|
||||
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
|
||||
}
|
||||
|
||||
// Reserve searches for the next unused ID and assignes it to the
|
||||
// Reserve searches for the next unused ID and assigns it to the
|
||||
// peer.
|
||||
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
|
||||
ids.mtx.Lock()
|
||||
@@ -110,10 +110,16 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
|
||||
mempool: mempool,
|
||||
ids: newMempoolIDs(),
|
||||
}
|
||||
memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR)
|
||||
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
|
||||
return memR
|
||||
}
|
||||
|
||||
// InitPeer implements Reactor by creating a state for the peer.
|
||||
func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
|
||||
memR.ids.ReserveForPeer(peer)
|
||||
return peer
|
||||
}
|
||||
|
||||
// SetLogger sets the Logger on the reactor and the underlying mempool.
|
||||
func (memR *Reactor) SetLogger(l log.Logger) {
|
||||
memR.Logger = l
|
||||
@@ -142,7 +148,6 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
|
||||
// AddPeer implements Reactor.
|
||||
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
|
||||
func (memR *Reactor) AddPeer(peer p2p.Peer) {
|
||||
memR.ids.ReserveForPeer(peer)
|
||||
go memR.broadcastTxRoutine(peer)
|
||||
}
|
||||
|
||||
|
||||
@@ -223,3 +223,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
|
||||
ids.ReserveForPeer(peer)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDontExhaustMaxActiveIDs(t *testing.T) {
|
||||
config := cfg.TestConfig()
|
||||
const N = 1
|
||||
reactors := makeAndConnectReactors(config, N)
|
||||
defer func() {
|
||||
for _, r := range reactors {
|
||||
r.Stop()
|
||||
}
|
||||
}()
|
||||
reactor := reactors[0]
|
||||
|
||||
for i := 0; i < maxActiveIDs+1; i++ {
|
||||
peer := mock.NewPeer(nil)
|
||||
reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
|
||||
reactor.AddPeer(peer)
|
||||
}
|
||||
}
|
||||
|
||||
19
node/node.go
19
node/node.go
@@ -467,6 +467,11 @@ func createTransport(
|
||||
}
|
||||
|
||||
p2p.MultiplexTransportConnFilters(connFilters...)(transport)
|
||||
|
||||
// Limit the number of incoming connections.
|
||||
max := config.P2P.MaxNumInboundPeers
|
||||
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)
|
||||
|
||||
return transport, peerFilters
|
||||
}
|
||||
|
||||
@@ -1187,7 +1192,19 @@ func createAndStartPrivValidatorSocketClient(
|
||||
return nil, errors.Wrap(err, "failed to start private validator")
|
||||
}
|
||||
|
||||
return pvsc, nil
|
||||
// try to get a pubkey from private validate first time
|
||||
pubKey := pvsc.GetPubKey()
|
||||
if pubKey == nil {
|
||||
return nil, errors.New("could not retrieve public key from private validator")
|
||||
}
|
||||
|
||||
const (
|
||||
retries = 50 // 50 * 100ms = 5s total
|
||||
timeout = 100 * time.Millisecond
|
||||
)
|
||||
pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout)
|
||||
|
||||
return pvscWithRetries, nil
|
||||
}
|
||||
|
||||
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
|
||||
|
||||
@@ -158,7 +158,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
assert.IsType(t, &privval.SignerClient{}, n.PrivValidator())
|
||||
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
// address without a protocol must result in error
|
||||
@@ -202,7 +202,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
assert.IsType(t, &privval.SignerClient{}, n.PrivValidator())
|
||||
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
// testFreeAddr claims a free port so we don't block on listener being ready.
|
||||
|
||||
@@ -313,21 +313,43 @@ var rfc4862 = net.IPNet{IP: net.ParseIP("FE80::"), Mask: net.CIDRMask(64, 128)}
|
||||
var rfc6052 = net.IPNet{IP: net.ParseIP("64:FF9B::"), Mask: net.CIDRMask(96, 128)}
|
||||
var rfc6145 = net.IPNet{IP: net.ParseIP("::FFFF:0:0:0"), Mask: net.CIDRMask(96, 128)}
|
||||
var zero4 = net.IPNet{IP: net.ParseIP("0.0.0.0"), Mask: net.CIDRMask(8, 32)}
|
||||
var (
|
||||
// onionCatNet defines the IPv6 address block used to support Tor.
|
||||
// bitcoind encodes a .onion address as a 16 byte number by decoding the
|
||||
// address prior to the .onion (i.e. the key hash) base32 into a ten
|
||||
// byte number. It then stores the first 6 bytes of the address as
|
||||
// 0xfd, 0x87, 0xd8, 0x7e, 0xeb, 0x43.
|
||||
//
|
||||
// This is the same range used by OnionCat, which is part part of the
|
||||
// RFC4193 unique local IPv6 range.
|
||||
//
|
||||
// In summary the format is:
|
||||
// { magic 6 bytes, 10 bytes base32 decode of key hash }
|
||||
onionCatNet = ipNet("fd87:d87e:eb43::", 48, 128)
|
||||
)
|
||||
|
||||
// ipNet returns a net.IPNet struct given the passed IP address string, number
|
||||
// of one bits to include at the start of the mask, and the total number of bits
|
||||
// for the mask.
|
||||
func ipNet(ip string, ones, bits int) net.IPNet {
|
||||
return net.IPNet{IP: net.ParseIP(ip), Mask: net.CIDRMask(ones, bits)}
|
||||
}
|
||||
|
||||
func (na *NetAddress) RFC1918() bool {
|
||||
return rfc1918_10.Contains(na.IP) ||
|
||||
rfc1918_192.Contains(na.IP) ||
|
||||
rfc1918_172.Contains(na.IP)
|
||||
}
|
||||
func (na *NetAddress) RFC3849() bool { return rfc3849.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC3927() bool { return rfc3927.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC3964() bool { return rfc3964.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC4193() bool { return rfc4193.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC4380() bool { return rfc4380.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC4843() bool { return rfc4843.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC4862() bool { return rfc4862.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC6052() bool { return rfc6052.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC6145() bool { return rfc6145.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC3849() bool { return rfc3849.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC3927() bool { return rfc3927.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC3964() bool { return rfc3964.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC4193() bool { return rfc4193.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC4380() bool { return rfc4380.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC4843() bool { return rfc4843.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC4862() bool { return rfc4862.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC6052() bool { return rfc6052.Contains(na.IP) }
|
||||
func (na *NetAddress) RFC6145() bool { return rfc6145.Contains(na.IP) }
|
||||
func (na *NetAddress) OnionCatTor() bool { return onionCatNet.Contains(na.IP) }
|
||||
|
||||
func removeProtocolIfDefined(addr string) string {
|
||||
if strings.Contains(addr, "://") {
|
||||
|
||||
@@ -768,31 +768,36 @@ func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) int {
|
||||
}
|
||||
|
||||
// Return a string representing the network group of this address.
|
||||
// This is the /16 for IPv4, the /32 (/36 for he.net) for IPv6, the string
|
||||
// This is the /16 for IPv4 (e.g. 1.2.0.0), the /32 (/36 for he.net) for IPv6, the string
|
||||
// "local" for a local address and the string "unroutable" for an unroutable
|
||||
// address.
|
||||
func (a *addrBook) groupKey(na *p2p.NetAddress) string {
|
||||
if a.routabilityStrict && na.Local() {
|
||||
return groupKeyFor(na, a.routabilityStrict)
|
||||
}
|
||||
|
||||
func groupKeyFor(na *p2p.NetAddress, routabilityStrict bool) string {
|
||||
if routabilityStrict && na.Local() {
|
||||
return "local"
|
||||
}
|
||||
if a.routabilityStrict && !na.Routable() {
|
||||
if routabilityStrict && !na.Routable() {
|
||||
return "unroutable"
|
||||
}
|
||||
|
||||
if ipv4 := na.IP.To4(); ipv4 != nil {
|
||||
return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
|
||||
return na.IP.Mask(net.CIDRMask(16, 32)).String()
|
||||
}
|
||||
|
||||
if na.RFC6145() || na.RFC6052() {
|
||||
// last four bytes are the ip address
|
||||
ip := na.IP[12:16]
|
||||
return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
|
||||
return ip.Mask(net.CIDRMask(16, 32)).String()
|
||||
}
|
||||
|
||||
if na.RFC3964() {
|
||||
ip := na.IP[2:7]
|
||||
return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
|
||||
|
||||
ip := na.IP[2:6]
|
||||
return ip.Mask(net.CIDRMask(16, 32)).String()
|
||||
}
|
||||
|
||||
if na.RFC4380() {
|
||||
// teredo tunnels have the last 4 bytes as the v4 address XOR
|
||||
// 0xff.
|
||||
@@ -800,20 +805,24 @@ func (a *addrBook) groupKey(na *p2p.NetAddress) string {
|
||||
for i, byte := range na.IP[12:16] {
|
||||
ip[i] = byte ^ 0xff
|
||||
}
|
||||
return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
|
||||
return ip.Mask(net.CIDRMask(16, 32)).String()
|
||||
}
|
||||
|
||||
if na.OnionCatTor() {
|
||||
// group is keyed off the first 4 bits of the actual onion key.
|
||||
return fmt.Sprintf("tor:%d", na.IP[6]&((1<<4)-1))
|
||||
}
|
||||
|
||||
// OK, so now we know ourselves to be a IPv6 address.
|
||||
// bitcoind uses /32 for everything, except for Hurricane Electric's
|
||||
// (he.net) IP range, which it uses /36 for.
|
||||
bits := 32
|
||||
heNet := &net.IPNet{IP: net.ParseIP("2001:470::"),
|
||||
Mask: net.CIDRMask(32, 128)}
|
||||
heNet := &net.IPNet{IP: net.ParseIP("2001:470::"), Mask: net.CIDRMask(32, 128)}
|
||||
if heNet.Contains(na.IP) {
|
||||
bits = 36
|
||||
}
|
||||
|
||||
return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
|
||||
ipv6Mask := net.CIDRMask(bits, 128)
|
||||
return na.IP.Mask(ipv6Mask).String()
|
||||
}
|
||||
|
||||
// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
@@ -545,6 +546,73 @@ func TestMultipleAddrBookAddressSelection(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddrBookGroupKey(t *testing.T) {
|
||||
// non-strict routability
|
||||
testCases := []struct {
|
||||
name string
|
||||
ip string
|
||||
expKey string
|
||||
}{
|
||||
// IPv4 normal.
|
||||
{"ipv4 normal class a", "12.1.2.3", "12.1.0.0"},
|
||||
{"ipv4 normal class b", "173.1.2.3", "173.1.0.0"},
|
||||
{"ipv4 normal class c", "196.1.2.3", "196.1.0.0"},
|
||||
|
||||
// IPv6/IPv4 translations.
|
||||
{"ipv6 rfc3964 with ipv4 encap", "2002:0c01:0203::", "12.1.0.0"},
|
||||
{"ipv6 rfc4380 toredo ipv4", "2001:0:1234::f3fe:fdfc", "12.1.0.0"},
|
||||
{"ipv6 rfc6052 well-known prefix with ipv4", "64:ff9b::0c01:0203", "12.1.0.0"},
|
||||
{"ipv6 rfc6145 translated ipv4", "::ffff:0:0c01:0203", "12.1.0.0"},
|
||||
|
||||
// Tor.
|
||||
{"ipv6 tor onioncat", "fd87:d87e:eb43:1234::5678", "tor:2"},
|
||||
{"ipv6 tor onioncat 2", "fd87:d87e:eb43:1245::6789", "tor:2"},
|
||||
{"ipv6 tor onioncat 3", "fd87:d87e:eb43:1345::6789", "tor:3"},
|
||||
|
||||
// IPv6 normal.
|
||||
{"ipv6 normal", "2602:100::1", "2602:100::"},
|
||||
{"ipv6 normal 2", "2602:0100::1234", "2602:100::"},
|
||||
{"ipv6 hurricane electric", "2001:470:1f10:a1::2", "2001:470:1000::"},
|
||||
{"ipv6 hurricane electric 2", "2001:0470:1f10:a1::2", "2001:470:1000::"},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
nip := net.ParseIP(tc.ip)
|
||||
key := groupKeyFor(p2p.NewNetAddressIPPort(nip, 26656), false)
|
||||
assert.Equal(t, tc.expKey, key, "#%d", i)
|
||||
}
|
||||
|
||||
// strict routability
|
||||
testCases = []struct {
|
||||
name string
|
||||
ip string
|
||||
expKey string
|
||||
}{
|
||||
// Local addresses.
|
||||
{"ipv4 localhost", "127.0.0.1", "local"},
|
||||
{"ipv6 localhost", "::1", "local"},
|
||||
{"ipv4 zero", "0.0.0.0", "local"},
|
||||
{"ipv4 first octet zero", "0.1.2.3", "local"},
|
||||
|
||||
// Unroutable addresses.
|
||||
{"ipv4 invalid bcast", "255.255.255.255", "unroutable"},
|
||||
{"ipv4 rfc1918 10/8", "10.1.2.3", "unroutable"},
|
||||
{"ipv4 rfc1918 172.16/12", "172.16.1.2", "unroutable"},
|
||||
{"ipv4 rfc1918 192.168/16", "192.168.1.2", "unroutable"},
|
||||
{"ipv6 rfc3849 2001:db8::/32", "2001:db8::1234", "unroutable"},
|
||||
{"ipv4 rfc3927 169.254/16", "169.254.1.2", "unroutable"},
|
||||
{"ipv6 rfc4193 fc00::/7", "fc00::1234", "unroutable"},
|
||||
{"ipv6 rfc4843 2001:10::/28", "2001:10::1234", "unroutable"},
|
||||
{"ipv6 rfc4862 fe80::/64", "fe80::1234", "unroutable"},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
nip := net.ParseIP(tc.ip)
|
||||
key := groupKeyFor(p2p.NewNetAddressIPPort(nip, 26656), true)
|
||||
assert.Equal(t, tc.expKey, key, "#%d", i)
|
||||
}
|
||||
}
|
||||
|
||||
func assertMOldAndNNewAddrsInSelection(t *testing.T, m, n int, addrs []*p2p.NetAddress, book *addrBook) {
|
||||
nOld, nNew := countOldAndNewAddrsInSelection(addrs, book)
|
||||
assert.Equal(t, m, nOld, "old addresses")
|
||||
|
||||
@@ -130,7 +130,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
|
||||
lastReceivedRequests: cmn.NewCMap(),
|
||||
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
|
||||
}
|
||||
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
|
||||
r.BaseReactor = *p2p.NewBaseReactor("PEX", r)
|
||||
return r
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/netutil"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
@@ -122,11 +123,18 @@ func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption {
|
||||
return func(mt *MultiplexTransport) { mt.resolver = resolver }
|
||||
}
|
||||
|
||||
// MultiplexTransportMaxIncomingConnections sets the maximum number of
|
||||
// simultaneous connections (incoming). Default: 0 (unlimited)
|
||||
func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
|
||||
return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n }
|
||||
}
|
||||
|
||||
// MultiplexTransport accepts and dials tcp connections and upgrades them to
|
||||
// multiplexed peers.
|
||||
type MultiplexTransport struct {
|
||||
netAddr NetAddress
|
||||
listener net.Listener
|
||||
netAddr NetAddress
|
||||
listener net.Listener
|
||||
maxIncomingConnections int // see MaxIncomingConnections
|
||||
|
||||
acceptc chan accept
|
||||
closec chan struct{}
|
||||
@@ -240,6 +248,10 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if mt.maxIncomingConnections > 0 {
|
||||
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
|
||||
}
|
||||
|
||||
mt.netAddr = addr
|
||||
mt.listener = ln
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"math/rand"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -134,6 +135,50 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
|
||||
mt := newMultiplexTransport(
|
||||
emptyNodeInfo(),
|
||||
NodeKey{
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
},
|
||||
)
|
||||
id := mt.nodeKey.ID()
|
||||
|
||||
MultiplexTransportMaxIncomingConnections(0)(mt)
|
||||
|
||||
addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := mt.Listen(*addr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
errc := make(chan error)
|
||||
|
||||
go func() {
|
||||
addr := NewNetAddress(id, mt.listener.Addr())
|
||||
|
||||
_, err := addr.Dial()
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
|
||||
close(errc)
|
||||
}()
|
||||
|
||||
if err := <-errc; err != nil {
|
||||
t.Errorf("connection failed: %v", err)
|
||||
}
|
||||
|
||||
_, err = mt.Accept(peerConfig{})
|
||||
if err == nil || !strings.Contains(err.Error(), "connection reset by peer") {
|
||||
t.Errorf("expected connection reset by peer error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransportMultiplexAcceptMultiple(t *testing.T) {
|
||||
mt := testSetupMultiplexTransport(t)
|
||||
laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
|
||||
|
||||
77
privval/retry_signer_client.go
Normal file
77
privval/retry_signer_client.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// RetrySignerClient wraps SignerClient adding retry for each operation (except
|
||||
// Ping) w/ a timeout.
|
||||
type RetrySignerClient struct {
|
||||
next *SignerClient
|
||||
retries int
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// NewRetrySignerClient returns RetrySignerClient. If +retries+ is 0, the
|
||||
// client will be retrying each operation indefinitely.
|
||||
func NewRetrySignerClient(sc *SignerClient, retries int, timeout time.Duration) *RetrySignerClient {
|
||||
return &RetrySignerClient{sc, retries, timeout}
|
||||
}
|
||||
|
||||
var _ types.PrivValidator = (*RetrySignerClient)(nil)
|
||||
|
||||
func (sc *RetrySignerClient) Close() error {
|
||||
return sc.next.Close()
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) IsConnected() bool {
|
||||
return sc.next.IsConnected()
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) WaitForConnection(maxWait time.Duration) error {
|
||||
return sc.next.WaitForConnection(maxWait)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Implement PrivValidator
|
||||
|
||||
func (sc *RetrySignerClient) Ping() error {
|
||||
return sc.next.Ping()
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) GetPubKey() crypto.PubKey {
|
||||
for i := 0; i < sc.retries || sc.retries == 0; i++ {
|
||||
pk := sc.next.GetPubKey()
|
||||
if pk != nil {
|
||||
return pk
|
||||
}
|
||||
time.Sleep(sc.timeout)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) SignVote(chainID string, vote *types.Vote) error {
|
||||
for i := 0; i < sc.retries || sc.retries == 0; i++ {
|
||||
err := sc.next.SignVote(chainID, vote)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(sc.timeout)
|
||||
}
|
||||
return errors.New("exhausted all attempts to sign vote")
|
||||
}
|
||||
|
||||
func (sc *RetrySignerClient) SignProposal(chainID string, proposal *types.Proposal) error {
|
||||
for i := 0; i < sc.retries || sc.retries == 0; i++ {
|
||||
err := sc.next.SignProposal(chainID, proposal)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(sc.timeout)
|
||||
}
|
||||
return errors.New("exhausted all attempts to sign proposal")
|
||||
}
|
||||
@@ -28,6 +28,55 @@ const (
|
||||
protoTCP = "tcp"
|
||||
)
|
||||
|
||||
// Parsed URL structure
|
||||
type parsedURL struct {
|
||||
url.URL
|
||||
}
|
||||
|
||||
// Parse URL and set defaults
|
||||
func newParsedURL(remoteAddr string) (*parsedURL, error) {
|
||||
u, err := url.Parse(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// default to tcp if nothing specified
|
||||
if u.Scheme == "" {
|
||||
u.Scheme = protoTCP
|
||||
}
|
||||
|
||||
return &parsedURL{*u}, nil
|
||||
}
|
||||
|
||||
// Change protocol to HTTP for unknown protocols and TCP protocol - useful for RPC connections
|
||||
func (u *parsedURL) SetDefaultSchemeHTTP() {
|
||||
// protocol to use for http operations, to support both http and https
|
||||
switch u.Scheme {
|
||||
case protoHTTP, protoHTTPS, protoWS, protoWSS:
|
||||
// known protocols not changed
|
||||
default:
|
||||
// default to http for unknown protocols (ex. tcp)
|
||||
u.Scheme = protoHTTP
|
||||
}
|
||||
}
|
||||
|
||||
// Get full address without the protocol - useful for Dialer connections
|
||||
func (u parsedURL) GetHostWithPath() string {
|
||||
// Remove protocol, userinfo and # fragment, assume opaque is empty
|
||||
return u.Host + u.EscapedPath()
|
||||
}
|
||||
|
||||
// Get a trimmed address - useful for WS connections
|
||||
func (u parsedURL) GetTrimmedHostWithPath() string {
|
||||
// replace / with . for http requests (kvstore domain)
|
||||
return strings.Replace(u.GetHostWithPath(), "/", ".", -1)
|
||||
}
|
||||
|
||||
// Get a trimmed address with protocol - useful as address in RPC connections
|
||||
func (u parsedURL) GetTrimmedURL() string {
|
||||
return u.Scheme + "://" + u.GetTrimmedHostWithPath()
|
||||
}
|
||||
|
||||
// HTTPClient is a common interface for JSONRPCClient and URIClient.
|
||||
type HTTPClient interface {
|
||||
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
|
||||
@@ -35,62 +84,6 @@ type HTTPClient interface {
|
||||
SetCodec(*amino.Codec)
|
||||
}
|
||||
|
||||
// protocol - client's protocol (for example, "http", "https", "wss", "ws", "tcp")
|
||||
// trimmedS - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80") with "/" replaced with "."
|
||||
func toClientAddrAndParse(remoteAddr string) (network string, trimmedS string, err error) {
|
||||
protocol, address, err := parseRemoteAddr(remoteAddr)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
// protocol to use for http operations, to support both http and https
|
||||
var clientProtocol string
|
||||
// default to http for unknown protocols (ex. tcp)
|
||||
switch protocol {
|
||||
case protoHTTP, protoHTTPS, protoWS, protoWSS:
|
||||
clientProtocol = protocol
|
||||
default:
|
||||
clientProtocol = protoHTTP
|
||||
}
|
||||
|
||||
// replace / with . for http requests (kvstore domain)
|
||||
trimmedAddress := strings.Replace(address, "/", ".", -1)
|
||||
return clientProtocol, trimmedAddress, nil
|
||||
}
|
||||
|
||||
func toClientAddress(remoteAddr string) (string, error) {
|
||||
clientProtocol, trimmedAddress, err := toClientAddrAndParse(remoteAddr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return clientProtocol + "://" + trimmedAddress, nil
|
||||
}
|
||||
|
||||
// network - name of the network (for example, "tcp", "unix")
|
||||
// s - rest of the address (for example, "192.0.2.1:25", "[2001:db8::1]:80")
|
||||
// TODO: Deprecate support for IP:PORT or /path/to/socket
|
||||
func parseRemoteAddr(remoteAddr string) (network string, s string, err error) {
|
||||
parts := strings.SplitN(remoteAddr, "://", 2)
|
||||
var protocol, address string
|
||||
switch {
|
||||
case len(parts) == 1:
|
||||
// default to tcp if nothing specified
|
||||
protocol, address = protoTCP, remoteAddr
|
||||
case len(parts) == 2:
|
||||
protocol, address = parts[0], parts[1]
|
||||
default:
|
||||
return "", "", fmt.Errorf("invalid addr: %s", remoteAddr)
|
||||
}
|
||||
|
||||
// accept http(s) as an alias for tcp
|
||||
switch protocol {
|
||||
case protoHTTP, protoHTTPS:
|
||||
protocol = protoTCP
|
||||
}
|
||||
|
||||
return protocol, address, nil
|
||||
}
|
||||
|
||||
func makeErrorDialer(err error) func(string, string) (net.Conn, error) {
|
||||
return func(_ string, _ string) (net.Conn, error) {
|
||||
return nil, err
|
||||
@@ -98,13 +91,21 @@ func makeErrorDialer(err error) func(string, string) (net.Conn, error) {
|
||||
}
|
||||
|
||||
func makeHTTPDialer(remoteAddr string) func(string, string) (net.Conn, error) {
|
||||
protocol, address, err := parseRemoteAddr(remoteAddr)
|
||||
u, err := newParsedURL(remoteAddr)
|
||||
if err != nil {
|
||||
return makeErrorDialer(err)
|
||||
}
|
||||
|
||||
protocol := u.Scheme
|
||||
|
||||
// accept http(s) as an alias for tcp
|
||||
switch protocol {
|
||||
case protoHTTP, protoHTTPS:
|
||||
protocol = protoTCP
|
||||
}
|
||||
|
||||
return func(proto, addr string) (net.Conn, error) {
|
||||
return net.Dial(protocol, address)
|
||||
return net.Dial(protocol, u.GetHostWithPath())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,10 +143,12 @@ type JSONRPCRequestBatch struct {
|
||||
|
||||
// JSONRPCClient takes params as a slice
|
||||
type JSONRPCClient struct {
|
||||
address string
|
||||
client *http.Client
|
||||
id types.JSONRPCStringID
|
||||
cdc *amino.Codec
|
||||
address string
|
||||
username string
|
||||
password string
|
||||
client *http.Client
|
||||
id types.JSONRPCStringID
|
||||
cdc *amino.Codec
|
||||
}
|
||||
|
||||
// JSONRPCCaller implementers can facilitate calling the JSON RPC endpoint.
|
||||
@@ -170,16 +173,24 @@ func NewJSONRPCClientWithHTTPClient(remote string, client *http.Client) *JSONRPC
|
||||
panic("nil http.Client provided")
|
||||
}
|
||||
|
||||
clientAddress, err := toClientAddress(remote)
|
||||
parsedURL, err := newParsedURL(remote)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid remote %s: %s", remote, err))
|
||||
}
|
||||
|
||||
parsedURL.SetDefaultSchemeHTTP()
|
||||
|
||||
address := parsedURL.GetTrimmedURL()
|
||||
username := parsedURL.User.Username()
|
||||
password, _ := parsedURL.User.Password()
|
||||
|
||||
return &JSONRPCClient{
|
||||
address: clientAddress,
|
||||
client: client,
|
||||
id: types.JSONRPCStringID("jsonrpc-client-" + cmn.RandStr(8)),
|
||||
cdc: amino.NewCodec(),
|
||||
address: address,
|
||||
username: username,
|
||||
password: password,
|
||||
client: client,
|
||||
id: types.JSONRPCStringID("jsonrpc-client-" + cmn.RandStr(8)),
|
||||
cdc: amino.NewCodec(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,7 +206,15 @@ func (c *JSONRPCClient) Call(method string, params map[string]interface{}, resul
|
||||
return nil, err
|
||||
}
|
||||
requestBuf := bytes.NewBuffer(requestBytes)
|
||||
httpResponse, err := c.client.Post(c.address, "text/json", requestBuf)
|
||||
httpRequest, err := http.NewRequest(http.MethodPost, c.address, requestBuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpRequest.Header.Set("Content-Type", "text/json")
|
||||
if c.username != "" || c.password != "" {
|
||||
httpRequest.SetBasicAuth(c.username, c.password)
|
||||
}
|
||||
httpResponse, err := c.client.Do(httpRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -228,7 +247,15 @@ func (c *JSONRPCClient) sendBatch(requests []*jsonRPCBufferedRequest) ([]interfa
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpResponse, err := c.client.Post(c.address, "text/json", bytes.NewBuffer(requestBytes))
|
||||
httpRequest, err := http.NewRequest(http.MethodPost, c.address, bytes.NewBuffer(requestBytes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpRequest.Header.Set("Content-Type", "text/json")
|
||||
if c.username != "" || c.password != "" {
|
||||
httpRequest.SetBasicAuth(c.username, c.password)
|
||||
}
|
||||
httpResponse, err := c.client.Do(httpRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -315,12 +342,15 @@ type URIClient struct {
|
||||
|
||||
// The function panics if the provided remote is invalid.
|
||||
func NewURIClient(remote string) *URIClient {
|
||||
clientAddress, err := toClientAddress(remote)
|
||||
parsedURL, err := newParsedURL(remote)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid remote %s: %s", remote, err))
|
||||
}
|
||||
|
||||
parsedURL.SetDefaultSchemeHTTP()
|
||||
|
||||
return &URIClient{
|
||||
address: clientAddress,
|
||||
address: parsedURL.GetTrimmedURL(),
|
||||
client: DefaultHTTPClient(remote),
|
||||
cdc: amino.NewCodec(),
|
||||
}
|
||||
|
||||
22
rpc/lib/client/http_client_test.go
Normal file
22
rpc/lib/client/http_client_test.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestHTTPClientMakeHTTPDialer(t *testing.T) {
|
||||
remote := []string{"https://foo-bar.com:80", "http://foo-bar.net:80", "https://user:pass@foo-bar.net:80"}
|
||||
|
||||
for _, f := range remote {
|
||||
u, err := newParsedURL(f)
|
||||
require.NoError(t, err)
|
||||
dialFn := makeHTTPDialer(f)
|
||||
|
||||
addr, err := dialFn(u.Scheme, u.GetHostWithPath())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, addr)
|
||||
}
|
||||
|
||||
}
|
||||
@@ -27,7 +27,7 @@ const (
|
||||
|
||||
// WSClient is a WebSocket client. The methods of WSClient are safe for use by
|
||||
// multiple goroutines.
|
||||
type WSClient struct { // nolint: maligned
|
||||
type WSClient struct {
|
||||
conn *websocket.Conn
|
||||
cdc *amino.Codec
|
||||
|
||||
@@ -80,18 +80,18 @@ type WSClient struct { // nolint: maligned
|
||||
// pong wait time. The endpoint argument must begin with a `/`.
|
||||
// The function panics if the provided address is invalid.
|
||||
func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
|
||||
protocol, addr, err := toClientAddrAndParse(remoteAddr)
|
||||
parsedURL, err := newParsedURL(remoteAddr)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("invalid remote %s: %s", remoteAddr, err))
|
||||
}
|
||||
// default to ws protocol, unless wss is explicitly specified
|
||||
if protocol != "wss" {
|
||||
protocol = "ws"
|
||||
if parsedURL.Scheme != protoWSS {
|
||||
parsedURL.Scheme = protoWS
|
||||
}
|
||||
|
||||
c := &WSClient{
|
||||
cdc: amino.NewCodec(),
|
||||
Address: addr,
|
||||
Address: parsedURL.GetTrimmedHostWithPath(),
|
||||
Dialer: makeHTTPDialer(remoteAddr),
|
||||
Endpoint: endpoint,
|
||||
PingPongLatencyTimer: metrics.NewTimer(),
|
||||
@@ -100,7 +100,7 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli
|
||||
readWait: defaultReadWait,
|
||||
writeWait: defaultWriteWait,
|
||||
pingPeriod: defaultPingPeriod,
|
||||
protocol: protocol,
|
||||
protocol: parsedURL.Scheme,
|
||||
}
|
||||
c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
|
||||
for _, option := range options {
|
||||
|
||||
@@ -64,7 +64,8 @@ func TestWSClientReconnectsAfterReadFailure(t *testing.T) {
|
||||
s := httptest.NewServer(h)
|
||||
defer s.Close()
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
// https://github.com/golang/go/issues/19297#issuecomment-282651469
|
||||
c := startClient(t, "//" + s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
wg.Add(1)
|
||||
@@ -96,7 +97,8 @@ func TestWSClientReconnectsAfterWriteFailure(t *testing.T) {
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
// https://github.com/golang/go/issues/19297#issuecomment-282651469
|
||||
c := startClient(t, "//" + s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
wg.Add(2)
|
||||
@@ -124,7 +126,8 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
// https://github.com/golang/go/issues/19297#issuecomment-282651469
|
||||
c := startClient(t, "//" + s.Listener.Addr().String())
|
||||
defer c.Stop()
|
||||
|
||||
go func() {
|
||||
@@ -173,7 +176,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
||||
func TestNotBlockingOnStop(t *testing.T) {
|
||||
timeout := 2 * time.Second
|
||||
s := httptest.NewServer(&myHandler{})
|
||||
c := startClient(t, s.Listener.Addr().String())
|
||||
c := startClient(t, "//" + s.Listener.Addr().String())
|
||||
c.Call(context.Background(), "a", make(map[string]interface{}))
|
||||
// Let the readRoutine get around to blocking
|
||||
time.Sleep(time.Second)
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
FROM golang:1.13
|
||||
|
||||
# Add testing deps for curl
|
||||
RUN echo 'deb http://httpredir.debian.org/debian testing main non-free contrib' >> /etc/apt/sources.list
|
||||
|
||||
# Grab deps (jq, hexdump, xxd, killall)
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends \
|
||||
@@ -28,11 +25,11 @@ RUN make install_abci
|
||||
RUN make install
|
||||
|
||||
RUN tendermint testnet \
|
||||
--config $REPO/test/docker/config-template.toml \
|
||||
--node-dir-prefix="mach" \
|
||||
--v=4 \
|
||||
--populate-persistent-peers=false \
|
||||
--o=$REPO/test/p2p/data
|
||||
--config $REPO/test/docker/config-template.toml \
|
||||
--node-dir-prefix="mach" \
|
||||
--v=4 \
|
||||
--populate-persistent-peers=false \
|
||||
--o=$REPO/test/p2p/data
|
||||
|
||||
# Now copy in the code
|
||||
# NOTE: this will overwrite whatever is in vendor/
|
||||
|
||||
@@ -20,7 +20,8 @@ const (
|
||||
// Must be a string because scripts like dist.sh read this file.
|
||||
// XXX: Don't change the name of this variable or you will break
|
||||
// automation :)
|
||||
TMCoreSemVer = "0.32.7"
|
||||
|
||||
TMCoreSemVer = "0.32.12"
|
||||
|
||||
// ABCISemVer is the semantic version of the ABCI library
|
||||
ABCISemVer = "0.16.1"
|
||||
|
||||
Reference in New Issue
Block a user