mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-09 06:33:16 +00:00
no empty blocks
This commit is contained in:
@@ -10,13 +10,15 @@ import (
|
||||
"time"
|
||||
|
||||
fail "github.com/ebuchman/fail-test"
|
||||
|
||||
wire "github.com/tendermint/go-wire"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
)
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
@@ -225,6 +227,9 @@ type ConsensusState struct {
|
||||
doPrevote func(height, round int)
|
||||
setProposal func(proposal *types.Proposal) error
|
||||
|
||||
// signifies that txs are available for proposal
|
||||
txsAvailable chan RoundState
|
||||
|
||||
// closed when we finish shutting down
|
||||
done chan struct{}
|
||||
}
|
||||
@@ -239,6 +244,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon
|
||||
peerMsgQueue: make(chan msgInfo, msgQueueSize),
|
||||
internalMsgQueue: make(chan msgInfo, msgQueueSize),
|
||||
timeoutTicker: NewTimeoutTicker(),
|
||||
txsAvailable: make(chan RoundState),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
// set function defaults (may be overwritten before calling Start)
|
||||
@@ -619,6 +625,8 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
|
||||
var mi msgInfo
|
||||
|
||||
select {
|
||||
case rs_ := <-cs.txsAvailable:
|
||||
cs.enterPropose(rs_.Height, rs_.Round)
|
||||
case mi = <-cs.peerMsgQueue:
|
||||
cs.wal.Save(mi)
|
||||
// handles proposals, block parts, votes
|
||||
@@ -770,11 +778,41 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
|
||||
|
||||
types.FireEventNewRound(cs.evsw, cs.RoundStateEvent())
|
||||
|
||||
// Immediately go to enterPropose.
|
||||
cs.enterPropose(height, round)
|
||||
// Wait for txs to be available in the mempool
|
||||
// before we enterPropose
|
||||
go cs.waitForTxs(height, round)
|
||||
}
|
||||
|
||||
// Enter: from enterNewRound(height,round).
|
||||
func (cs *ConsensusState) waitForTxs(height, round int) {
|
||||
// if we're the proposer, start a heartbeat routine
|
||||
// to tell other peers we're just waiting for txs (for debugging)
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
if cs.isProposer() {
|
||||
go cs.proposerHeartbeat(done)
|
||||
}
|
||||
|
||||
// wait for the mempool to have some txs
|
||||
<-cs.mempool.TxsAvailable()
|
||||
|
||||
// now we can enterPropose
|
||||
cs.txsAvailable <- RoundState{Height: height, Round: round}
|
||||
}
|
||||
|
||||
func (cs *ConsensusState) proposerHeartbeat(done chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
// TODO: broadcast heartbeat
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Enter: from enter NewRound(height,round), once txs are in the mempool
|
||||
func (cs *ConsensusState) enterPropose(height int, round int) {
|
||||
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) {
|
||||
cs.Logger.Debug(cmn.Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
|
||||
|
||||
@@ -72,6 +72,10 @@ type Mempool struct {
|
||||
// A log of mempool txs
|
||||
wal *auto.AutoFile
|
||||
|
||||
// fires once for each height, when the mempool is not empty
|
||||
txsAvailable chan struct{}
|
||||
notifiedTxsAvailable bool
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
@@ -88,6 +92,7 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M
|
||||
recheckEnd: nil,
|
||||
logger: log.NewNopLogger(),
|
||||
cache: newTxCache(cacheSize),
|
||||
txsAvailable: make(chan struct{}, 1),
|
||||
}
|
||||
mempool.initWAL()
|
||||
proxyAppConn.SetResponseCallback(mempool.resCb)
|
||||
@@ -215,6 +220,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
|
||||
tx: req.GetCheckTx().Tx,
|
||||
}
|
||||
mem.txs.PushBack(memTx)
|
||||
mem.alertIfTxsAvailable()
|
||||
} else {
|
||||
// ignore bad transaction
|
||||
mem.logger.Info("Bad Transaction", "res", r)
|
||||
@@ -256,12 +262,25 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
||||
// Done!
|
||||
atomic.StoreInt32(&mem.rechecking, 0)
|
||||
mem.logger.Info("Done rechecking txs")
|
||||
|
||||
mem.alertIfTxsAvailable()
|
||||
}
|
||||
default:
|
||||
// ignore other messages
|
||||
}
|
||||
}
|
||||
|
||||
func (mem *Mempool) alertIfTxsAvailable() {
|
||||
if !mem.notifiedTxsAvailable && mem.Size() > 0 {
|
||||
mem.notifiedTxsAvailable = true
|
||||
mem.txsAvailable <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (mem *Mempool) TxsAvailable() chan struct{} {
|
||||
return mem.txsAvailable
|
||||
}
|
||||
|
||||
// Reap returns a list of transactions currently in the mempool.
|
||||
// If maxTxs is -1, there is no cap on the number of returned transactions.
|
||||
func (mem *Mempool) Reap(maxTxs int) types.Txs {
|
||||
@@ -307,13 +326,15 @@ func (mem *Mempool) Update(height int, txs types.Txs) {
|
||||
|
||||
// Set height
|
||||
mem.height = height
|
||||
mem.notifiedTxsAvailable = false
|
||||
|
||||
// Remove transactions that are already in txs.
|
||||
goodTxs := mem.filterTxs(txsMap)
|
||||
// Recheck mempool txs if any txs were committed in the block
|
||||
// NOTE/XXX: in some apps a tx could be invalidated due to EndBlock,
|
||||
// so we really still do need to recheck, but this is for debugging
|
||||
if mem.config.Recheck && (mem.config.RecheckEmpty || len(txs) > 0) {
|
||||
mem.logger.Info("Recheck txs", "numtxs", len(goodTxs))
|
||||
mem.logger.Info("Recheck txs", "numtxs", len(goodTxs), "height", height)
|
||||
mem.recheckTxs(goodTxs)
|
||||
// At this point, mem.txs are being rechecked.
|
||||
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
|
||||
|
||||
@@ -139,6 +139,7 @@ func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreat
|
||||
mempoolLogger := logger.With("module", "mempool")
|
||||
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool())
|
||||
mempool.SetLogger(mempoolLogger)
|
||||
mempool.Update(state.LastBlockHeight, nil)
|
||||
mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
|
||||
mempoolReactor.SetLogger(mempoolLogger)
|
||||
|
||||
|
||||
@@ -23,6 +23,8 @@ type Mempool interface {
|
||||
Reap(int) Txs
|
||||
Update(height int, txs Txs)
|
||||
Flush()
|
||||
|
||||
TxsAvailable() chan struct{}
|
||||
}
|
||||
|
||||
type MockMempool struct {
|
||||
@@ -35,6 +37,7 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil
|
||||
func (m MockMempool) Reap(n int) Txs { return Txs{} }
|
||||
func (m MockMempool) Update(height int, txs Txs) {}
|
||||
func (m MockMempool) Flush() {}
|
||||
func (m MockMempool) TxsAvailable() chan struct{} { return make(chan struct{}) }
|
||||
|
||||
//------------------------------------------------------
|
||||
// blockstore
|
||||
|
||||
Reference in New Issue
Block a user