mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 13:55:17 +00:00
p2p/conn: Add Bufferpool (#3664)
* use byte buffer pool to decreass allocs * wrap to put buffer in defer * wapper defer * add dependency * remove Gopkg,* * add change log
This commit is contained in:
@@ -2,6 +2,7 @@ package conn
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/cipher"
|
||||
crand "crypto/rand"
|
||||
"crypto/sha256"
|
||||
"crypto/subtle"
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
"golang.org/x/crypto/curve25519"
|
||||
"golang.org/x/crypto/nacl/box"
|
||||
|
||||
pool "github.com/libp2p/go-buffer-pool"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"golang.org/x/crypto/hkdf"
|
||||
@@ -47,10 +49,11 @@ var (
|
||||
type SecretConnection struct {
|
||||
|
||||
// immutable
|
||||
recvSecret *[aeadKeySize]byte
|
||||
sendSecret *[aeadKeySize]byte
|
||||
remPubKey crypto.PubKey
|
||||
conn io.ReadWriteCloser
|
||||
recvAead cipher.AEAD
|
||||
sendAead cipher.AEAD
|
||||
|
||||
remPubKey crypto.PubKey
|
||||
conn io.ReadWriteCloser
|
||||
|
||||
// net.Conn must be thread safe:
|
||||
// https://golang.org/pkg/net/#Conn.
|
||||
@@ -102,14 +105,22 @@ func MakeSecretConnection(conn io.ReadWriteCloser, locPrivKey crypto.PrivKey) (*
|
||||
// generate the secret used for receiving, sending, challenge via hkdf-sha2 on dhSecret
|
||||
recvSecret, sendSecret, challenge := deriveSecretAndChallenge(dhSecret, locIsLeast)
|
||||
|
||||
sendAead, err := chacha20poly1305.New(sendSecret[:])
|
||||
if err != nil {
|
||||
return nil, errors.New("Invalid send SecretConnection Key")
|
||||
}
|
||||
recvAead, err := chacha20poly1305.New(recvSecret[:])
|
||||
if err != nil {
|
||||
return nil, errors.New("Invalid receive SecretConnection Key")
|
||||
}
|
||||
// Construct SecretConnection.
|
||||
sc := &SecretConnection{
|
||||
conn: conn,
|
||||
recvBuffer: nil,
|
||||
recvNonce: new([aeadNonceSize]byte),
|
||||
sendNonce: new([aeadNonceSize]byte),
|
||||
recvSecret: recvSecret,
|
||||
sendSecret: sendSecret,
|
||||
recvAead: recvAead,
|
||||
sendAead: sendAead,
|
||||
}
|
||||
|
||||
// Sign the challenge bytes for authentication.
|
||||
@@ -143,35 +154,39 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) {
|
||||
defer sc.sendMtx.Unlock()
|
||||
|
||||
for 0 < len(data) {
|
||||
var frame = make([]byte, totalFrameSize)
|
||||
var chunk []byte
|
||||
if dataMaxSize < len(data) {
|
||||
chunk = data[:dataMaxSize]
|
||||
data = data[dataMaxSize:]
|
||||
} else {
|
||||
chunk = data
|
||||
data = nil
|
||||
}
|
||||
chunkLength := len(chunk)
|
||||
binary.LittleEndian.PutUint32(frame, uint32(chunkLength))
|
||||
copy(frame[dataLenSize:], chunk)
|
||||
if err := func() error {
|
||||
var sealedFrame = pool.Get(aeadSizeOverhead + totalFrameSize)
|
||||
var frame = pool.Get(totalFrameSize)
|
||||
defer func() {
|
||||
pool.Put(sealedFrame)
|
||||
pool.Put(frame)
|
||||
}()
|
||||
var chunk []byte
|
||||
if dataMaxSize < len(data) {
|
||||
chunk = data[:dataMaxSize]
|
||||
data = data[dataMaxSize:]
|
||||
} else {
|
||||
chunk = data
|
||||
data = nil
|
||||
}
|
||||
chunkLength := len(chunk)
|
||||
binary.LittleEndian.PutUint32(frame, uint32(chunkLength))
|
||||
copy(frame[dataLenSize:], chunk)
|
||||
|
||||
aead, err := chacha20poly1305.New(sc.sendSecret[:])
|
||||
if err != nil {
|
||||
return n, errors.New("Invalid SecretConnection Key")
|
||||
}
|
||||
// encrypt the frame
|
||||
sc.sendAead.Seal(sealedFrame[:0], sc.sendNonce[:], frame, nil)
|
||||
incrNonce(sc.sendNonce)
|
||||
// end encryption
|
||||
|
||||
// encrypt the frame
|
||||
var sealedFrame = make([]byte, aeadSizeOverhead+totalFrameSize)
|
||||
aead.Seal(sealedFrame[:0], sc.sendNonce[:], frame, nil)
|
||||
incrNonce(sc.sendNonce)
|
||||
// end encryption
|
||||
|
||||
_, err = sc.conn.Write(sealedFrame)
|
||||
if err != nil {
|
||||
_, err = sc.conn.Write(sealedFrame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n += len(chunk)
|
||||
return nil
|
||||
}(); err != nil {
|
||||
return n, err
|
||||
}
|
||||
n += len(chunk)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -189,21 +204,18 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
// read off the conn
|
||||
sealedFrame := make([]byte, totalFrameSize+aeadSizeOverhead)
|
||||
var sealedFrame = pool.Get(aeadSizeOverhead + totalFrameSize)
|
||||
defer pool.Put(sealedFrame)
|
||||
_, err = io.ReadFull(sc.conn, sealedFrame)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
aead, err := chacha20poly1305.New(sc.recvSecret[:])
|
||||
if err != nil {
|
||||
return n, errors.New("Invalid SecretConnection Key")
|
||||
}
|
||||
|
||||
// decrypt the frame.
|
||||
// reads and updates the sc.recvNonce
|
||||
var frame = make([]byte, totalFrameSize)
|
||||
_, err = aead.Open(frame[:0], sc.recvNonce[:], sealedFrame, nil)
|
||||
var frame = pool.Get(totalFrameSize)
|
||||
defer pool.Put(frame)
|
||||
_, err = sc.recvAead.Open(frame[:0], sc.recvNonce[:], sealedFrame, nil)
|
||||
if err != nil {
|
||||
return n, errors.New("Failed to decrypt SecretConnection")
|
||||
}
|
||||
@@ -218,7 +230,10 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
|
||||
}
|
||||
var chunk = frame[dataLenSize : dataLenSize+chunkLength]
|
||||
n = copy(data, chunk)
|
||||
sc.recvBuffer = chunk[n:]
|
||||
if n < len(chunk) {
|
||||
sc.recvBuffer = make([]byte, len(chunk)-n)
|
||||
copy(sc.recvBuffer, chunk[n:])
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -383,10 +383,23 @@ func createGoldenTestVectors(t *testing.T) string {
|
||||
return data
|
||||
}
|
||||
|
||||
func BenchmarkSecretConnection(b *testing.B) {
|
||||
func BenchmarkWriteSecretConnection(b *testing.B) {
|
||||
b.StopTimer()
|
||||
b.ReportAllocs()
|
||||
fooSecConn, barSecConn := makeSecretConnPair(b)
|
||||
fooWriteText := cmn.RandStr(dataMaxSize)
|
||||
randomMsgSizes := []int{
|
||||
dataMaxSize / 10,
|
||||
dataMaxSize / 3,
|
||||
dataMaxSize / 2,
|
||||
dataMaxSize,
|
||||
dataMaxSize * 3 / 2,
|
||||
dataMaxSize * 2,
|
||||
dataMaxSize * 7 / 2,
|
||||
}
|
||||
fooWriteBytes := make([][]byte, 0, len(randomMsgSizes))
|
||||
for _, size := range randomMsgSizes {
|
||||
fooWriteBytes = append(fooWriteBytes, cmn.RandBytes(size))
|
||||
}
|
||||
// Consume reads from bar's reader
|
||||
go func() {
|
||||
readBuffer := make([]byte, dataMaxSize)
|
||||
@@ -402,7 +415,8 @@ func BenchmarkSecretConnection(b *testing.B) {
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := fooSecConn.Write([]byte(fooWriteText))
|
||||
idx := cmn.RandIntn(len(fooWriteBytes))
|
||||
_, err := fooSecConn.Write(fooWriteBytes[idx])
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to write to fooSecConn: %v", err)
|
||||
}
|
||||
@@ -414,3 +428,44 @@ func BenchmarkSecretConnection(b *testing.B) {
|
||||
}
|
||||
//barSecConn.Close() race condition
|
||||
}
|
||||
|
||||
func BenchmarkReadSecretConnection(b *testing.B) {
|
||||
b.StopTimer()
|
||||
b.ReportAllocs()
|
||||
fooSecConn, barSecConn := makeSecretConnPair(b)
|
||||
randomMsgSizes := []int{
|
||||
dataMaxSize / 10,
|
||||
dataMaxSize / 3,
|
||||
dataMaxSize / 2,
|
||||
dataMaxSize,
|
||||
dataMaxSize * 3 / 2,
|
||||
dataMaxSize * 2,
|
||||
dataMaxSize * 7 / 2,
|
||||
}
|
||||
fooWriteBytes := make([][]byte, 0, len(randomMsgSizes))
|
||||
for _, size := range randomMsgSizes {
|
||||
fooWriteBytes = append(fooWriteBytes, cmn.RandBytes(size))
|
||||
}
|
||||
go func() {
|
||||
for i := 0; i < b.N; i++ {
|
||||
idx := cmn.RandIntn(len(fooWriteBytes))
|
||||
_, err := fooSecConn.Write(fooWriteBytes[idx])
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to write to fooSecConn: %v, %v,%v", err, i, b.N)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
readBuffer := make([]byte, dataMaxSize)
|
||||
_, err := barSecConn.Read(readBuffer)
|
||||
|
||||
if err == io.EOF {
|
||||
return
|
||||
} else if err != nil {
|
||||
b.Fatalf("Failed to read from barSecConn: %v", err)
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user