use worker pool for parallel parsing

This commit is contained in:
William Banfield
2022-08-31 16:39:25 -04:00
parent 8960a19822
commit 729d3e1885

View File

@@ -2,6 +2,7 @@ package report
import (
"math"
"sync"
"time"
"github.com/tendermint/tendermint/test/loadtime/payload"
@@ -22,32 +23,70 @@ type Report struct {
}
func GenerateFromBlockStore(s blockStore) (Report, error) {
type payloadData struct {
l time.Duration
err error
}
type txData struct {
tx []byte
bt time.Time
}
const poolSize = 16
txc := make(chan txData)
pdc := make(chan payloadData, poolSize)
wg := &sync.WaitGroup{}
wg.Add(poolSize)
for i := 0; i < poolSize; i++ {
go func() {
defer wg.Done()
for b := range txc {
p, err := payload.FromBytes(b.tx)
if err != nil {
pdc <- payloadData{err: err}
continue
}
l := p.Time.AsTime().Sub(b.bt)
pdc <- payloadData{l: l}
}
}()
}
go func() {
wg.Wait()
close(pdc)
}()
r := Report{
Max: 0,
Min: math.MaxInt64,
}
var sum int64
for i := s.Base(); i < s.Height(); i++ {
b := s.LoadBlock(i)
for _, tx := range b.Data.Txs {
p, err := payload.FromBytes(tx)
if err != nil {
r.ErrorCount++
continue
go func() {
for i := s.Base(); i < s.Height(); i++ {
b := s.LoadBlock(i)
for _, tx := range b.Data.Txs {
txc <- txData{tx: tx, bt: b.Time}
}
t := p.Time.AsTime().Sub(b.Time)
r.All = append(r.All, t)
if t > r.Max {
r.Max = t
} else if t < r.Min {
r.Min = t
}
// Using an int64 here makes an assumption about the scale and quantity of the data we are processing.
// If all latencies were 2 seconds, we would need around 4 billion records to overflow this.
// We are therefore assuming that the data does not exceed these bounds.
sum += int64(t)
}
close(txc)
}()
for pd := range pdc {
if pd.err != nil {
r.ErrorCount++
continue
}
r.All = append(r.All, pd.l)
if pd.l > r.Max {
r.Max = pd.l
} else if pd.l < r.Min {
r.Min = pd.l
}
// Using an int64 here makes an assumption about the scale and quantity of the data we are processing.
// If all latencies were 2 seconds, we would need around 4 billion records to overflow this.
// We are therefore assuming that the data does not exceed these bounds.
sum += int64(pd.l)
}
r.Avg = time.Duration(sum / int64(len(r.All)))
return r, nil