From 729d3e188585a3f8788e074713648cbdf8ea82f2 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 31 Aug 2022 16:39:25 -0400 Subject: [PATCH] use worker pool for parallel parsing --- test/loadtime/report/report.go | 77 +++++++++++++++++++++++++--------- 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/test/loadtime/report/report.go b/test/loadtime/report/report.go index c6bf13cb3..3bc2949a4 100644 --- a/test/loadtime/report/report.go +++ b/test/loadtime/report/report.go @@ -2,6 +2,7 @@ package report import ( "math" + "sync" "time" "github.com/tendermint/tendermint/test/loadtime/payload" @@ -22,32 +23,70 @@ type Report struct { } func GenerateFromBlockStore(s blockStore) (Report, error) { + type payloadData struct { + l time.Duration + err error + } + type txData struct { + tx []byte + bt time.Time + } + + const poolSize = 16 + + txc := make(chan txData) + pdc := make(chan payloadData, poolSize) + + wg := &sync.WaitGroup{} + wg.Add(poolSize) + for i := 0; i < poolSize; i++ { + go func() { + defer wg.Done() + for b := range txc { + p, err := payload.FromBytes(b.tx) + if err != nil { + pdc <- payloadData{err: err} + continue + } + l := p.Time.AsTime().Sub(b.bt) + pdc <- payloadData{l: l} + } + }() + } + go func() { + wg.Wait() + close(pdc) + }() + r := Report{ Max: 0, Min: math.MaxInt64, } var sum int64 - for i := s.Base(); i < s.Height(); i++ { - b := s.LoadBlock(i) - for _, tx := range b.Data.Txs { - p, err := payload.FromBytes(tx) - if err != nil { - r.ErrorCount++ - continue + go func() { + for i := s.Base(); i < s.Height(); i++ { + b := s.LoadBlock(i) + for _, tx := range b.Data.Txs { + txc <- txData{tx: tx, bt: b.Time} } - t := p.Time.AsTime().Sub(b.Time) - r.All = append(r.All, t) - if t > r.Max { - r.Max = t - } else if t < r.Min { - r.Min = t - } - - // Using an int64 here makes an assumption about the scale and quantity of the data we are processing. - // If all latencies were 2 seconds, we would need around 4 billion records to overflow this. - // We are therefore assuming that the data does not exceed these bounds. - sum += int64(t) } + close(txc) + }() + for pd := range pdc { + if pd.err != nil { + r.ErrorCount++ + continue + } + r.All = append(r.All, pd.l) + if pd.l > r.Max { + r.Max = pd.l + } else if pd.l < r.Min { + r.Min = pd.l + } + // Using an int64 here makes an assumption about the scale and quantity of the data we are processing. + // If all latencies were 2 seconds, we would need around 4 billion records to overflow this. + // We are therefore assuming that the data does not exceed these bounds. + sum += int64(pd.l) } r.Avg = time.Duration(sum / int64(len(r.All))) return r, nil