Files
tendermint/test/loadtime/report/report.go
Sergio Mena cdd3479f20 Extend the load report tool to include transactions' hashes (#9509)
* Add transaction hash to raw data

* Add hash in formatted output

* Cosmetic
2022-10-05 21:16:45 +02:00

221 lines
5.8 KiB
Go

package report
import (
"math"
"sync"
"time"
"github.com/gofrs/uuid"
"github.com/tendermint/tendermint/test/loadtime/payload"
"github.com/tendermint/tendermint/types"
"gonum.org/v1/gonum/stat"
)
// BlockStore defines the set of methods needed by the report generator from
// Tendermint's store.Blockstore type. Using an interface allows for tests to
// more easily simulate the required behavior without having to use the more
// complex real API.
type BlockStore interface {
Height() int64
Base() int64
LoadBlock(int64) *types.Block
}
// DataPoint contains the set of data collected for each transaction.
type DataPoint struct {
Duration time.Duration
BlockTime time.Time
Hash []byte
}
// Report contains the data calculated from reading the timestamped transactions
// of each block found in the blockstore.
type Report struct {
ID uuid.UUID
Rate, Connections, Size uint64
Max, Min, Avg, StdDev time.Duration
// NegativeCount is the number of negative durations encountered while
// reading the transaction data. A negative duration means that
// a transaction timestamp was greater than the timestamp of the block it
// was included in and likely indicates an issue with the experimental
// setup.
NegativeCount int
// All contains all data points gathered from all valid transactions.
// The order of the contents of All is not guaranteed to be match the order of transactions
// in the chain.
All []DataPoint
// used for calculating average during report creation.
sum int64
}
type Reports struct {
s map[uuid.UUID]Report
l []Report
// errorCount is the number of parsing errors encountered while reading the
// transaction data. Parsing errors may occur if a transaction not generated
// by the payload package is submitted to the chain.
errorCount int
}
func (rs *Reports) List() []Report {
return rs.l
}
func (rs *Reports) ErrorCount() int {
return rs.errorCount
}
func (rs *Reports) addDataPoint(id uuid.UUID, l time.Duration, bt time.Time, hash []byte, conns, rate, size uint64) {
r, ok := rs.s[id]
if !ok {
r = Report{
Max: 0,
Min: math.MaxInt64,
ID: id,
Connections: conns,
Rate: rate,
Size: size,
}
rs.s[id] = r
}
r.All = append(r.All, DataPoint{Duration: l, BlockTime: bt, Hash: hash})
if l > r.Max {
r.Max = l
}
if l < r.Min {
r.Min = l
}
if int64(l) < 0 {
r.NegativeCount++
}
// Using an int64 here makes an assumption about the scale and quantity of the data we are processing.
// If all latencies were 2 seconds, we would need around 4 billion records to overflow this.
// We are therefore assuming that the data does not exceed these bounds.
r.sum += int64(l)
rs.s[id] = r
}
func (rs *Reports) calculateAll() {
rs.l = make([]Report, 0, len(rs.s))
for _, r := range rs.s {
if len(r.All) == 0 {
r.Min = 0
rs.l = append(rs.l, r)
continue
}
r.Avg = time.Duration(r.sum / int64(len(r.All)))
r.StdDev = time.Duration(int64(stat.StdDev(toFloat(r.All), nil)))
rs.l = append(rs.l, r)
}
}
func (rs *Reports) addError() {
rs.errorCount++
}
// GenerateFromBlockStore creates a Report using the data in the provided
// BlockStore.
func GenerateFromBlockStore(s BlockStore) (*Reports, error) {
type payloadData struct {
id uuid.UUID
l time.Duration
bt time.Time
hash []byte
connections, rate, size uint64
err error
}
type txData struct {
tx types.Tx
bt time.Time
}
reports := &Reports{
s: make(map[uuid.UUID]Report),
}
// Deserializing to proto can be slow but does not depend on other data
// and can therefore be done in parallel.
// Deserializing in parallel does mean that the resulting data is
// not guaranteed to be delivered in the same order it was given to the
// worker pool.
const poolSize = 16
txc := make(chan txData)
pdc := make(chan payloadData, poolSize)
wg := &sync.WaitGroup{}
wg.Add(poolSize)
for i := 0; i < poolSize; i++ {
go func() {
defer wg.Done()
for b := range txc {
p, err := payload.FromBytes(b.tx)
if err != nil {
pdc <- payloadData{err: err}
continue
}
l := b.bt.Sub(p.Time.AsTime())
idb := (*[16]byte)(p.Id)
pdc <- payloadData{
l: l,
bt: b.bt,
hash: b.tx.Hash(),
id: uuid.UUID(*idb),
connections: p.Connections,
rate: p.Rate,
size: p.Size,
}
}
}()
}
go func() {
wg.Wait()
close(pdc)
}()
go func() {
base, height := s.Base(), s.Height()
prev := s.LoadBlock(base)
for i := base + 1; i < height; i++ {
// Data from two adjacent block are used here simultaneously,
// blocks of height H and H+1. The transactions of the block of
// height H are used with the timestamp from the block of height
// H+1. This is done because the timestamp from H+1 is calculated
// by using the precommits submitted at height H. The timestamp in
// block H+1 represents the time at which block H was committed.
//
// In the (very unlikely) event that the very last block of the
// chain contains payload transactions, those transactions will not
// be used in the latency calculations because the last block whose
// transactions are used is the block one before the last.
cur := s.LoadBlock(i)
for _, tx := range prev.Data.Txs {
txc <- txData{tx: tx, bt: cur.Time}
}
prev = cur
}
close(txc)
}()
for pd := range pdc {
if pd.err != nil {
reports.addError()
continue
}
reports.addDataPoint(pd.id, pd.l, pd.bt, pd.hash, pd.connections, pd.rate, pd.size)
}
reports.calculateAll()
return reports, nil
}
func toFloat(in []DataPoint) []float64 {
r := make([]float64, len(in))
for i, v := range in {
r[i] = float64(int64(v.Duration))
}
return r
}