From 642a24dc9c4fe8f44035a753e4213c9554151e08 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 17 Oct 2016 16:54:51 -0700 Subject: [PATCH] Mempool WAL --- config/tendermint/config.go | 1 + config/tendermint_test/config.go | 1 + mempool/mempool.go | 23 +++++++++++++++++++++++ 3 files changed, 25 insertions(+) diff --git a/config/tendermint/config.go b/config/tendermint/config.go index 4d2323ab3..465297ba3 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -84,6 +84,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) + mapConfig.SetDefault("mempool_wal", rootDir+"/data/mempool_wal") return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 120079858..6f3217475 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -97,6 +97,7 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("mempool_recheck", true) mapConfig.SetDefault("mempool_recheck_empty", true) mapConfig.SetDefault("mempool_broadcast", true) + mapConfig.SetDefault("mempool_wal", "") return mapConfig } diff --git a/mempool/mempool.go b/mempool/mempool.go index 2b7be1ddb..c3c9a5f06 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -60,6 +60,9 @@ type Mempool struct { // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. cache *txCache + + // A log of mempool txs + wal *AutoFile } func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool { @@ -75,10 +78,22 @@ func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool { cache: newTxCache(cacheSize), } + mempool.initWAL() proxyAppConn.SetResponseCallback(mempool.resCb) return mempool } +func (mem *Mempool) initWAL() { + walFileName := mem.config.GetString("mempool_wal") + if walFileName != "" { + af, err := OpenAutoFile(walFileName) + if err != nil { + PanicSanity(err) + } + mem.wal = af + } +} + // consensus must be able to hold lock to safely update func (mem *Mempool) Lock() { mem.proxyMtx.Lock() @@ -138,6 +153,14 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) { mem.cache.Push(tx) // END CACHE + // WAL + if mem.wal != nil { + // TODO: Notify administrators when WAL fails + mem.wal.Write([]byte(tx)) + mem.wal.Write([]byte("\n")) + } + // END WAL + // NOTE: proxyAppConn may error if tx buffer is full if err = mem.proxyAppConn.Error(); err != nil { return err