mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-10 15:07:24 +00:00
internal/libs/protoio: optimize MarshalDelimited by plain byteslice allocations+sync.Pool (#7325) (#7426)
Noticed in profiles that invoking *VoteSignBytes always created a
bytes.Buffer, then discarded it inside protoio.MarshalDelimited.
I dug further and examined the call paths and noticed that we
unconditionally create the bytes.Buffer, even though we might
have proto messages (in the common case) that implement
MarshalTo([]byte), and invoked varintWriter. Instead by inlining
this case, we skip a bunch of allocations and CPU cycles,
which then reflects properly on all calling functions. Here
are the benchmark results:
```shell
$ benchstat before.txt after.txt
name old time/op new time/op delta
types.VoteSignBytes-8 705ns ± 3% 573ns ± 6% -18.74% (p=0.000 n=18+20)
types.CommitVoteSignBytes-8 8.15µs ± 9% 6.81µs ± 4% -16.51% (p=0.000 n=20+19)
protoio.MarshalDelimitedWithMarshalTo-8 788ns ± 8% 772ns ± 3% -2.01% (p=0.050 n=20+20)
protoio.MarshalDelimitedNoMarshalTo-8 989ns ± 4% 845ns ± 2% -14.51% (p=0.000 n=20+18)
name old alloc/op new alloc/op delta
types.VoteSignBytes-8 792B ± 0% 600B ± 0% -24.24% (p=0.000 n=20+20)
types.CommitVoteSignBytes-8 9.52kB ± 0% 7.60kB ± 0% -20.17% (p=0.000 n=20+20)
protoio.MarshalDelimitedNoMarshalTo-8 808B ± 0% 440B ± 0% -45.54% (p=0.000 n=20+20)
name old allocs/op new allocs/op delta
types.VoteSignBytes-8 13.0 ± 0% 10.0 ± 0% -23.08% (p=0.000 n=20+20)
types.CommitVoteSignBytes-8 140 ± 0% 110 ± 0% -21.43% (p=0.000 n=20+20)
protoio.MarshalDelimitedNoMarshalTo-8 10.0 ± 0% 7.0 ± 0% -30.00% (p=0.000 n=20+20)
```
Thanks to Tharsis who tasked me to help them increase TPS and who
are keen on improving Tendermint and efficiency.
(cherry picked from commit 3e92899bd9)
Co-authored-by: Emmanuel T Odeke <emmanuel@orijtech.com>
This commit is contained in:
@@ -29,6 +29,7 @@ Special thanks to external contributors on this release:
|
||||
- [rpc] [\#7270](https://github.com/tendermint/tendermint/pull/7270) Add `header` and `header_by_hash` RPC Client queries. (@fedekunze) (@cmwaters)
|
||||
|
||||
### IMPROVEMENTS
|
||||
- [internal/protoio] \#7325 Optimized `MarshalDelimited` by inlining the common case and using a `sync.Pool` in the worst case. (@odeke-em)
|
||||
|
||||
- [\#7338](https://github.com/tendermint/tendermint/pull/7338) pubsub: Performance improvements for the event query API (backport of #7319) (@creachadair)
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
)
|
||||
@@ -90,11 +91,44 @@ func (w *varintWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func MarshalDelimited(msg proto.Message) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
_, err := NewDelimitedWriter(&buf).WriteMsg(msg)
|
||||
func varintWrittenBytes(m marshaler, size int) ([]byte, error) {
|
||||
buf := make([]byte, size+binary.MaxVarintLen64)
|
||||
n := binary.PutUvarint(buf, uint64(size))
|
||||
nw, err := m.MarshalTo(buf[n:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
return buf[:n+nw], nil
|
||||
}
|
||||
|
||||
var bufPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return new(bytes.Buffer)
|
||||
},
|
||||
}
|
||||
|
||||
func MarshalDelimited(msg proto.Message) ([]byte, error) {
|
||||
// The goal here is to write proto message as is knowning already if
|
||||
// the exact size can be retrieved and if so just use that.
|
||||
if m, ok := msg.(marshaler); ok {
|
||||
size, ok := getSize(msg)
|
||||
if ok {
|
||||
return varintWrittenBytes(m, size)
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, go down the route of using proto.Marshal,
|
||||
// and use the buffer pool to retrieve a writer.
|
||||
buf := bufPool.Get().(*bytes.Buffer)
|
||||
defer bufPool.Put(buf)
|
||||
buf.Reset()
|
||||
_, err := NewDelimitedWriter(buf).WriteMsg(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Given that we are reusing buffers, we should
|
||||
// make a copy of the returned bytes.
|
||||
bytesCopy := make([]byte, buf.Len())
|
||||
copy(bytesCopy, buf.Bytes())
|
||||
return bytesCopy, nil
|
||||
}
|
||||
|
||||
91
internal/libs/protoio/writer_test.go
Normal file
91
internal/libs/protoio/writer_test.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package protoio_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
"github.com/tendermint/tendermint/internal/libs/protoio"
|
||||
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func aVote() *types.Vote {
|
||||
var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &types.Vote{
|
||||
Type: tmproto.SignedMsgType(byte(tmproto.PrevoteType)),
|
||||
Height: 12345,
|
||||
Round: 2,
|
||||
Timestamp: stamp,
|
||||
BlockID: types.BlockID{
|
||||
Hash: tmhash.Sum([]byte("blockID_hash")),
|
||||
PartSetHeader: types.PartSetHeader{
|
||||
Total: 1000000,
|
||||
Hash: tmhash.Sum([]byte("blockID_part_set_header_hash")),
|
||||
},
|
||||
},
|
||||
ValidatorAddress: crypto.AddressHash([]byte("validator_address")),
|
||||
ValidatorIndex: 56789,
|
||||
}
|
||||
}
|
||||
|
||||
type excludedMarshalTo struct {
|
||||
msg proto.Message
|
||||
}
|
||||
|
||||
func (emt *excludedMarshalTo) ProtoMessage() {}
|
||||
func (emt *excludedMarshalTo) String() string {
|
||||
return emt.msg.String()
|
||||
}
|
||||
func (emt *excludedMarshalTo) Reset() {
|
||||
emt.msg.Reset()
|
||||
}
|
||||
func (emt *excludedMarshalTo) Marshal() ([]byte, error) {
|
||||
return proto.Marshal(emt.msg)
|
||||
}
|
||||
|
||||
var _ proto.Message = (*excludedMarshalTo)(nil)
|
||||
|
||||
var sink interface{}
|
||||
|
||||
func BenchmarkMarshalDelimitedWithMarshalTo(b *testing.B) {
|
||||
msgs := []proto.Message{
|
||||
aVote().ToProto(),
|
||||
}
|
||||
benchmarkMarshalDelimited(b, msgs)
|
||||
}
|
||||
|
||||
func BenchmarkMarshalDelimitedNoMarshalTo(b *testing.B) {
|
||||
msgs := []proto.Message{
|
||||
&excludedMarshalTo{aVote().ToProto()},
|
||||
}
|
||||
benchmarkMarshalDelimited(b, msgs)
|
||||
}
|
||||
|
||||
func benchmarkMarshalDelimited(b *testing.B, msgs []proto.Message) {
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, msg := range msgs {
|
||||
blob, err := protoio.MarshalDelimited(msg)
|
||||
require.Nil(b, err)
|
||||
sink = blob
|
||||
}
|
||||
}
|
||||
|
||||
if sink == nil {
|
||||
b.Fatal("Benchmark did not run")
|
||||
}
|
||||
|
||||
// Reset the sink.
|
||||
sink = (interface{})(nil)
|
||||
}
|
||||
@@ -294,3 +294,52 @@ func TestVoteProtobuf(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var sink interface{}
|
||||
|
||||
var protoVote *tmproto.Vote
|
||||
var sampleCommit *Commit
|
||||
|
||||
func init() {
|
||||
protoVote = examplePrecommit().ToProto()
|
||||
|
||||
lastID := makeBlockIDRandom()
|
||||
voteSet, _, vals := randVoteSet(2, 1, tmproto.PrecommitType, 10, 1)
|
||||
commit, err := makeCommit(lastID, 2, 1, voteSet, vals, time.Now())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sampleCommit = commit
|
||||
}
|
||||
|
||||
func BenchmarkVoteSignBytes(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
sink = VoteSignBytes("test_chain_id", protoVote)
|
||||
}
|
||||
|
||||
if sink == nil {
|
||||
b.Fatal("Benchmark did not run")
|
||||
}
|
||||
|
||||
// Reset the sink.
|
||||
sink = (interface{})(nil)
|
||||
}
|
||||
|
||||
func BenchmarkCommitVoteSignBytes(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
for index := range sampleCommit.Signatures {
|
||||
sink = sampleCommit.VoteSignBytes("test_chain_id", int32(index))
|
||||
}
|
||||
}
|
||||
|
||||
if sink == nil {
|
||||
b.Fatal("Benchmark did not run")
|
||||
}
|
||||
|
||||
// Reset the sink.
|
||||
sink = (interface{})(nil)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user