From 8e5e5c689f7afd96a391b607b8bd1630482d4431 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 11 Apr 2016 17:19:56 -0400 Subject: [PATCH] lock mempool for commit and update. closes #202 --- consensus/state.go | 47 +++++++++++++++++++++++++++++++++------------- mempool/mempool.go | 13 +++++++++++-- state/execution.go | 13 ------------- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index fdbfdb6e6..40d77be2a 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1217,7 +1217,14 @@ func (cs *ConsensusState) finalizeCommit(height int) { err := stateCopy.ExecBlock(cs.evsw, cs.proxyAppConn, block, blockParts.Header()) if err != nil { // TODO: handle this gracefully. - PanicQ(Fmt("Exec failed for application")) + PanicQ(Fmt("Exec failed for application: %v", err)) + } + + // lock mempool, commit state, update mempoool + err = cs.commitStateUpdateMempool(stateCopy, block) + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Commit failed for application: %v", err)) } // Save to blockStore. @@ -1227,21 +1234,9 @@ func (cs *ConsensusState) finalizeCommit(height int) { cs.blockStore.SaveBlock(block, blockParts, seenCommit) } - /* - // Commit to proxyAppConn - err = cs.proxyAppConn.CommitSync() - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Commit failed for application")) - } - */ - // Save the state. stateCopy.Save() - // Update mempool. - cs.mempool.Update(block.Height, block.Txs) - // NewHeightStep! cs.updateToState(stateCopy) @@ -1256,6 +1251,32 @@ func (cs *ConsensusState) finalizeCommit(height int) { return } +// mempool must be locked during commit and update +// because state is typically reset on Commit and old txs must be replayed +// against committed state before new txs are run in the mempool, lest they be invalid +func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Block) error { + cs.mempool.Lock() + defer cs.mempool.Unlock() + + // Commit block, get hash back + res := cs.proxyAppConn.CommitSync() + if res.IsErr() { + log.Warn("Error in proxyAppConn.CommitSync", "error", res) + return res + } + if res.Log != "" { + log.Debug("Commit.Log: " + res.Log) + } + + // Set the state's new AppHash + s.AppHash = res.Data + + // Update mempool. + cs.mempool.Update(block.Height, block.Txs) + + return nil +} + //----------------------------------------------------------------------------- func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { diff --git a/mempool/mempool.go b/mempool/mempool.go index 6df62feee..0523e1d3a 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -77,6 +77,14 @@ func NewMempool(proxyAppConn proxy.AppConn) *Mempool { return mempool } +func (mem *Mempool) Lock() { + mem.proxyMtx.Lock() +} + +func (mem *Mempool) Unlock() { + mem.proxyMtx.Unlock() +} + func (mem *Mempool) Size() int { return mem.txs.Len() } @@ -219,9 +227,10 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx { // Tell mempool that these txs were committed. // Mempool will discard these txs. // NOTE: this should be called *after* block is committed by consensus. +// NOTE: unsafe; Lock/Unlock must be managed by caller func (mem *Mempool) Update(height int, txs []types.Tx) { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + // mem.proxyMtx.Lock() + // defer mem.proxyMtx.Unlock() mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx // First, create a lookup map of txns in new txs. diff --git a/state/execution.go b/state/execution.go index 67c1d56bb..d2b027c84 100644 --- a/state/execution.go +++ b/state/execution.go @@ -94,20 +94,7 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy // TODO: Do something with changedValidators log.Info("TODO: Do something with changedValidators", changedValidators) - // Commit block, get hash back - res := proxyAppConn.CommitSync() - if res.IsErr() { - log.Warn("Error in proxyAppConn.CommitSync", "error", res) - return res - } - if res.Log != "" { - log.Debug("Commit.Log: " + res.Log) - } log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs)) - - // Set the state's new AppHash - s.AppHash = res.Data - return nil }