lite2: replace primary provider with alternative when unavailable (#4354)

Closes issue #4338

Uses a wrapper function around both the signedHeader and validatorSet calls to the primary provider which attempts to retrieve the information 5 times before deeming the provider unavailable and replacing the primary provider with the first alternative before trying recursively again (until all alternatives are depleted)

Employs a mutex lock for any operations involving the providers of the light client to ensure no operations occurs whilst the new primary is chosen.

Commits:

* created swapProvider function

* eliminates old primary provider after replacement. Uses a mutex when changing providers

* renamed to replaceProvider

* created wrapped functions for signed header and val set

* created test for primary provider replacement

* implemented suggested revisions

* created Witnesses() and Primary()

* modified backoffAndJitterTime

* modified backoffAndJitterTime

* changed backoff base and jitter to functional arguments

* implemented suggested changes

* removed backoff function

* changed exp function to match go version

* halved the backoff time

* removed seeding and added comments

* fixed incorrect test

* extract backoff timeout calc into a function

Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
This commit is contained in:
Callum Waters
2020-02-04 13:02:20 +01:00
committed by GitHub
parent 9b9f1beef3
commit df3eee455c
3 changed files with 179 additions and 14 deletions

View File

@@ -3,6 +3,7 @@ package lite
import (
"bytes"
"fmt"
"math/rand"
"sync"
"time"
@@ -51,6 +52,7 @@ const (
defaultUpdatePeriod = 5 * time.Second
defaultRemoveNoLongerTrustedHeadersPeriod = 24 * time.Hour
maxRetryAttempts = 10
)
// Option sets a parameter for the light client.
@@ -128,9 +130,10 @@ type Client struct {
verificationMode mode
trustLevel tmmath.Fraction
// Mutex for locking during changes of the lite clients providers
providerMutex sync.Mutex
// Primary provider of new headers.
primary provider.Provider
// See Witnesses option
witnesses []provider.Provider
@@ -275,7 +278,7 @@ func (c *Client) checkTrustedHeaderUsingOptions(options TrustOptions) error {
var primaryHash []byte
switch {
case options.Height > c.trustedHeader.Height:
h, err := c.primary.SignedHeader(c.trustedHeader.Height)
h, err := c.signedHeaderFromPrimary(c.trustedHeader.Height)
if err != nil {
return err
}
@@ -327,7 +330,7 @@ func (c *Client) checkTrustedHeaderUsingOptions(options TrustOptions) error {
// Fetch trustedHeader and trustedNextVals from primary provider.
func (c *Client) initializeWithTrustOptions(options TrustOptions) error {
// 1) Fetch and verify the header.
h, err := c.primary.SignedHeader(options.Height)
h, err := c.signedHeaderFromPrimary(options.Height)
if err != nil {
return err
}
@@ -342,7 +345,7 @@ func (c *Client) initializeWithTrustOptions(options TrustOptions) error {
}
// 2) Fetch and verify the vals.
vals, err := c.primary.ValidatorSet(options.Height)
vals, err := c.validatorSetFromPrimary(options.Height)
if err != nil {
return err
}
@@ -360,7 +363,7 @@ func (c *Client) initializeWithTrustOptions(options TrustOptions) error {
// 3) Fetch and verify the next vals (verification happens in
// updateTrustedHeaderAndVals).
nextVals, err := c.primary.ValidatorSet(options.Height + 1)
nextVals, err := c.validatorSetFromPrimary(options.Height + 1)
if err != nil {
return err
}
@@ -549,13 +552,31 @@ func (c *Client) VerifyHeader(newHeader *types.SignedHeader, newVals *types.Vali
}
// Update trusted header and vals.
nextVals, err := c.primary.ValidatorSet(newHeader.Height + 1)
nextVals, err := c.validatorSetFromPrimary(newHeader.Height + 1)
if err != nil {
return err
}
return c.updateTrustedHeaderAndVals(newHeader, nextVals)
}
// Primary returns the primary provider.
//
// NOTE: provider may be not safe for concurrent access.
func (c *Client) Primary() provider.Provider {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()
return c.primary
}
// Witnesses returns the witness providers.
//
// NOTE: providers may be not safe for concurrent access.
func (c *Client) Witnesses() []provider.Provider {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()
return c.witnesses
}
// Cleanup removes all the data (headers and validator sets) stored. Note: the
// client must be stopped at this point.
func (c *Client) Cleanup() error {
@@ -608,7 +629,7 @@ func (c *Client) sequence(newHeader *types.SignedHeader, newVals *types.Validato
err error
)
for height := c.trustedHeader.Height + 1; height < newHeader.Height; height++ {
interimHeader, err = c.primary.SignedHeader(height)
interimHeader, err = c.signedHeaderFromPrimary(height)
if err != nil {
return errors.Wrapf(err, "failed to obtain the header #%d", height)
}
@@ -628,7 +649,7 @@ func (c *Client) sequence(newHeader *types.SignedHeader, newVals *types.Validato
if height == newHeader.Height-1 {
nextVals = newVals
} else {
nextVals, err = c.primary.ValidatorSet(height + 1)
nextVals, err = c.validatorSetFromPrimary(height + 1)
if err != nil {
return errors.Wrapf(err, "failed to obtain the vals #%d", height+1)
}
@@ -682,7 +703,7 @@ func (c *Client) bisection(
// right branch
{
nextVals, err := c.primary.ValidatorSet(pivot + 1)
nextVals, err := c.validatorSetFromPrimary(pivot + 1)
if err != nil {
return errors.Wrapf(err, "failed to obtain the vals #%d", pivot+1)
}
@@ -725,11 +746,11 @@ func (c *Client) updateTrustedHeaderAndVals(h *types.SignedHeader, nextVals *typ
// fetch header and validators for the given height from primary provider.
func (c *Client) fetchHeaderAndValsAtHeight(height int64) (*types.SignedHeader, *types.ValidatorSet, error) {
h, err := c.primary.SignedHeader(height)
h, err := c.signedHeaderFromPrimary(height)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to obtain the header #%d", height)
}
vals, err := c.primary.ValidatorSet(height)
vals, err := c.validatorSetFromPrimary(height)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to obtain the vals #%d", height)
}
@@ -738,12 +759,15 @@ func (c *Client) fetchHeaderAndValsAtHeight(height int64) (*types.SignedHeader,
// compare header with one from a random witness.
func (c *Client) compareNewHeaderWithRandomWitness(h *types.SignedHeader) error {
c.providerMutex.Lock()
// 0. Check witnesses exist
if len(c.witnesses) == 0 {
return errors.New("could not find any witnesses")
}
// 1. Pick a witness.
witness := c.witnesses[tmrand.Intn(len(c.witnesses))]
c.providerMutex.Unlock()
// 2. Fetch the header.
altH, err := witness.SignedHeader(h.Height)
@@ -871,3 +895,66 @@ func (c *Client) Update(now time.Time) error {
return nil
}
// replaceProvider takes the first alternative provider and promotes it as the primary provider
func (c *Client) replacePrimaryProvider() error {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()
if len(c.witnesses) == 0 {
return errors.Errorf("no witnesses left")
}
c.primary = c.witnesses[0]
c.witnesses = c.witnesses[1:]
c.logger.Info("New primary", "p", c.primary)
return nil
}
// signedHeaderFromPrimary retrieves the SignedHeader from the primary provider at the specified height.
// Handles dropout by the primary provider by swapping with an alternative provider
func (c *Client) signedHeaderFromPrimary(height int64) (*types.SignedHeader, error) {
for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
c.providerMutex.Lock()
h, err := c.primary.SignedHeader(height)
c.providerMutex.Unlock()
if err == nil || err == provider.ErrSignedHeaderNotFound {
return h, err
}
time.Sleep(backoffTimeout(attempt))
}
c.logger.Info("Primary is unavailable. Replacing with the first witness")
err := c.replacePrimaryProvider()
if err != nil {
return nil, err
}
return c.signedHeaderFromPrimary(height)
}
// validatorSetFromPrimary retrieves the ValidatorSet from the primary provider at the specified height.
// Handles dropout by the primary provider after 5 attempts by replacing it with an alternative provider
func (c *Client) validatorSetFromPrimary(height int64) (*types.ValidatorSet, error) {
for attempt := 1; attempt <= maxRetryAttempts; attempt++ {
c.providerMutex.Lock()
h, err := c.primary.ValidatorSet(height)
c.providerMutex.Unlock()
if err == nil || err == provider.ErrValidatorSetNotFound {
return h, err
}
time.Sleep(backoffTimeout(attempt))
}
c.logger.Info("Primary is unavailable. Replacing with the first witness")
err := c.replacePrimaryProvider()
if err != nil {
return nil, err
}
return c.validatorSetFromPrimary(height)
}
// exponential backoff (with jitter)
// 0.5s -> 2s -> 4.5s -> 8s -> 12.5 with 1s variation
func backoffTimeout(attempt int) time.Duration {
return time.Duration(500*attempt*attempt)*time.Millisecond + time.Duration(rand.Intn(1000))*time.Millisecond
}

View File

@@ -867,3 +867,59 @@ func TestClient_Concurrency(t *testing.T) {
wg.Wait()
}
func TestProvider_Replacement(t *testing.T) {
const (
chainID = "TestProvider_Replacement"
)
var (
keys = genPrivKeys(4)
// 20, 30, 40, 50 - the first 3 don't have 2/3, the last 3 do!
vals = keys.ToValidators(20, 10)
bTime, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z")
header = keys.GenSignedHeader(chainID, 1, bTime, nil, vals, vals,
[]byte("app_hash"), []byte("cons_hash"), []byte("results_hash"), 0, len(keys))
primary = mockp.NewDeadMock(chainID)
witness = mockp.New(
chainID,
map[int64]*types.SignedHeader{
// trusted header
1: header,
// interim header (3/3 signed)
2: keys.GenSignedHeader(chainID, 2, bTime.Add(30*time.Minute), nil, vals, vals,
[]byte("app_hash"), []byte("cons_hash"), []byte("results_hash"), 0, len(keys)),
// last header (3/3 signed)
3: keys.GenSignedHeader(chainID, 3, bTime.Add(1*time.Hour), nil, vals, vals,
[]byte("app_hash"), []byte("cons_hash"), []byte("results_hash"), 0, len(keys)),
},
map[int64]*types.ValidatorSet{
1: vals,
2: vals,
3: vals,
4: vals,
},
)
)
c, err := NewClient(
chainID,
TrustOptions{
Period: 4 * time.Hour,
Height: 1,
Hash: header.Hash(),
},
primary,
[]provider.Provider{witness},
dbs.New(dbm.NewMemDB(), chainID),
UpdatePeriod(0),
Logger(log.TestingLogger()),
)
require.NoError(t, err)
err = c.Start()
require.NoError(t, err)
defer c.Stop()
assert.NotEqual(t, c.Primary(), primary)
assert.Equal(t, 0, len(c.Witnesses()))
}

View File

@@ -1,19 +1,20 @@
package mock
import (
"github.com/pkg/errors"
"github.com/tendermint/tendermint/lite2/provider"
"github.com/tendermint/tendermint/types"
)
// mock provider allows to directly set headers & vals, which can be handy when
// testing.
type mock struct {
chainID string
headers map[int64]*types.SignedHeader
vals map[int64]*types.ValidatorSet
}
// New creates a mock provider.
// New creates a mock provider with the given set of headers and validator
// sets.
func New(chainID string, headers map[int64]*types.SignedHeader, vals map[int64]*types.ValidatorSet) provider.Provider {
return &mock{
chainID: chainID,
@@ -45,3 +46,24 @@ func (p *mock) ValidatorSet(height int64) (*types.ValidatorSet, error) {
}
return nil, provider.ErrValidatorSetNotFound
}
type deadMock struct {
chainID string
}
// NewDeadMock creates a mock provider that always errors.
func NewDeadMock(chainID string) provider.Provider {
return &deadMock{chainID: chainID}
}
func (p *deadMock) ChainID() string {
return p.chainID
}
func (p *deadMock) SignedHeader(height int64) (*types.SignedHeader, error) {
return nil, errors.New("no response from provider")
}
func (p *deadMock) ValidatorSet(height int64) (*types.ValidatorSet, error) {
return nil, errors.New("no response from provider")
}