diff --git a/lite2/client.go b/lite2/client.go index 4248d5eb8..c1dc88b1a 100644 --- a/lite2/client.go +++ b/lite2/client.go @@ -3,6 +3,7 @@ package lite import ( "bytes" "fmt" + "math/rand" "sync" "time" @@ -51,6 +52,7 @@ const ( defaultUpdatePeriod = 5 * time.Second defaultRemoveNoLongerTrustedHeadersPeriod = 24 * time.Hour + maxRetryAttempts = 10 ) // Option sets a parameter for the light client. @@ -128,9 +130,10 @@ type Client struct { verificationMode mode trustLevel tmmath.Fraction + // Mutex for locking during changes of the lite clients providers + providerMutex sync.Mutex // Primary provider of new headers. primary provider.Provider - // See Witnesses option witnesses []provider.Provider @@ -275,7 +278,7 @@ func (c *Client) checkTrustedHeaderUsingOptions(options TrustOptions) error { var primaryHash []byte switch { case options.Height > c.trustedHeader.Height: - h, err := c.primary.SignedHeader(c.trustedHeader.Height) + h, err := c.signedHeaderFromPrimary(c.trustedHeader.Height) if err != nil { return err } @@ -327,7 +330,7 @@ func (c *Client) checkTrustedHeaderUsingOptions(options TrustOptions) error { // Fetch trustedHeader and trustedNextVals from primary provider. func (c *Client) initializeWithTrustOptions(options TrustOptions) error { // 1) Fetch and verify the header. - h, err := c.primary.SignedHeader(options.Height) + h, err := c.signedHeaderFromPrimary(options.Height) if err != nil { return err } @@ -342,7 +345,7 @@ func (c *Client) initializeWithTrustOptions(options TrustOptions) error { } // 2) Fetch and verify the vals. - vals, err := c.primary.ValidatorSet(options.Height) + vals, err := c.validatorSetFromPrimary(options.Height) if err != nil { return err } @@ -360,7 +363,7 @@ func (c *Client) initializeWithTrustOptions(options TrustOptions) error { // 3) Fetch and verify the next vals (verification happens in // updateTrustedHeaderAndVals). - nextVals, err := c.primary.ValidatorSet(options.Height + 1) + nextVals, err := c.validatorSetFromPrimary(options.Height + 1) if err != nil { return err } @@ -549,13 +552,31 @@ func (c *Client) VerifyHeader(newHeader *types.SignedHeader, newVals *types.Vali } // Update trusted header and vals. - nextVals, err := c.primary.ValidatorSet(newHeader.Height + 1) + nextVals, err := c.validatorSetFromPrimary(newHeader.Height + 1) if err != nil { return err } return c.updateTrustedHeaderAndVals(newHeader, nextVals) } +// Primary returns the primary provider. +// +// NOTE: provider may be not safe for concurrent access. +func (c *Client) Primary() provider.Provider { + c.providerMutex.Lock() + defer c.providerMutex.Unlock() + return c.primary +} + +// Witnesses returns the witness providers. +// +// NOTE: providers may be not safe for concurrent access. +func (c *Client) Witnesses() []provider.Provider { + c.providerMutex.Lock() + defer c.providerMutex.Unlock() + return c.witnesses +} + // Cleanup removes all the data (headers and validator sets) stored. Note: the // client must be stopped at this point. func (c *Client) Cleanup() error { @@ -608,7 +629,7 @@ func (c *Client) sequence(newHeader *types.SignedHeader, newVals *types.Validato err error ) for height := c.trustedHeader.Height + 1; height < newHeader.Height; height++ { - interimHeader, err = c.primary.SignedHeader(height) + interimHeader, err = c.signedHeaderFromPrimary(height) if err != nil { return errors.Wrapf(err, "failed to obtain the header #%d", height) } @@ -628,7 +649,7 @@ func (c *Client) sequence(newHeader *types.SignedHeader, newVals *types.Validato if height == newHeader.Height-1 { nextVals = newVals } else { - nextVals, err = c.primary.ValidatorSet(height + 1) + nextVals, err = c.validatorSetFromPrimary(height + 1) if err != nil { return errors.Wrapf(err, "failed to obtain the vals #%d", height+1) } @@ -682,7 +703,7 @@ func (c *Client) bisection( // right branch { - nextVals, err := c.primary.ValidatorSet(pivot + 1) + nextVals, err := c.validatorSetFromPrimary(pivot + 1) if err != nil { return errors.Wrapf(err, "failed to obtain the vals #%d", pivot+1) } @@ -725,11 +746,11 @@ func (c *Client) updateTrustedHeaderAndVals(h *types.SignedHeader, nextVals *typ // fetch header and validators for the given height from primary provider. func (c *Client) fetchHeaderAndValsAtHeight(height int64) (*types.SignedHeader, *types.ValidatorSet, error) { - h, err := c.primary.SignedHeader(height) + h, err := c.signedHeaderFromPrimary(height) if err != nil { return nil, nil, errors.Wrapf(err, "failed to obtain the header #%d", height) } - vals, err := c.primary.ValidatorSet(height) + vals, err := c.validatorSetFromPrimary(height) if err != nil { return nil, nil, errors.Wrapf(err, "failed to obtain the vals #%d", height) } @@ -738,12 +759,15 @@ func (c *Client) fetchHeaderAndValsAtHeight(height int64) (*types.SignedHeader, // compare header with one from a random witness. func (c *Client) compareNewHeaderWithRandomWitness(h *types.SignedHeader) error { + c.providerMutex.Lock() + // 0. Check witnesses exist if len(c.witnesses) == 0 { return errors.New("could not find any witnesses") } // 1. Pick a witness. witness := c.witnesses[tmrand.Intn(len(c.witnesses))] + c.providerMutex.Unlock() // 2. Fetch the header. altH, err := witness.SignedHeader(h.Height) @@ -871,3 +895,66 @@ func (c *Client) Update(now time.Time) error { return nil } + +// replaceProvider takes the first alternative provider and promotes it as the primary provider +func (c *Client) replacePrimaryProvider() error { + c.providerMutex.Lock() + defer c.providerMutex.Unlock() + if len(c.witnesses) == 0 { + return errors.Errorf("no witnesses left") + } + c.primary = c.witnesses[0] + c.witnesses = c.witnesses[1:] + c.logger.Info("New primary", "p", c.primary) + return nil +} + +// signedHeaderFromPrimary retrieves the SignedHeader from the primary provider at the specified height. +// Handles dropout by the primary provider by swapping with an alternative provider +func (c *Client) signedHeaderFromPrimary(height int64) (*types.SignedHeader, error) { + for attempt := 1; attempt <= maxRetryAttempts; attempt++ { + c.providerMutex.Lock() + h, err := c.primary.SignedHeader(height) + c.providerMutex.Unlock() + if err == nil || err == provider.ErrSignedHeaderNotFound { + return h, err + } + time.Sleep(backoffTimeout(attempt)) + } + + c.logger.Info("Primary is unavailable. Replacing with the first witness") + err := c.replacePrimaryProvider() + if err != nil { + return nil, err + } + + return c.signedHeaderFromPrimary(height) +} + +// validatorSetFromPrimary retrieves the ValidatorSet from the primary provider at the specified height. +// Handles dropout by the primary provider after 5 attempts by replacing it with an alternative provider +func (c *Client) validatorSetFromPrimary(height int64) (*types.ValidatorSet, error) { + for attempt := 1; attempt <= maxRetryAttempts; attempt++ { + c.providerMutex.Lock() + h, err := c.primary.ValidatorSet(height) + c.providerMutex.Unlock() + if err == nil || err == provider.ErrValidatorSetNotFound { + return h, err + } + time.Sleep(backoffTimeout(attempt)) + } + + c.logger.Info("Primary is unavailable. Replacing with the first witness") + err := c.replacePrimaryProvider() + if err != nil { + return nil, err + } + + return c.validatorSetFromPrimary(height) +} + +// exponential backoff (with jitter) +// 0.5s -> 2s -> 4.5s -> 8s -> 12.5 with 1s variation +func backoffTimeout(attempt int) time.Duration { + return time.Duration(500*attempt*attempt)*time.Millisecond + time.Duration(rand.Intn(1000))*time.Millisecond +} diff --git a/lite2/client_test.go b/lite2/client_test.go index 9ddab1ead..d7145feb7 100644 --- a/lite2/client_test.go +++ b/lite2/client_test.go @@ -867,3 +867,59 @@ func TestClient_Concurrency(t *testing.T) { wg.Wait() } + +func TestProvider_Replacement(t *testing.T) { + const ( + chainID = "TestProvider_Replacement" + ) + + var ( + keys = genPrivKeys(4) + // 20, 30, 40, 50 - the first 3 don't have 2/3, the last 3 do! + vals = keys.ToValidators(20, 10) + bTime, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + header = keys.GenSignedHeader(chainID, 1, bTime, nil, vals, vals, + []byte("app_hash"), []byte("cons_hash"), []byte("results_hash"), 0, len(keys)) + primary = mockp.NewDeadMock(chainID) + witness = mockp.New( + chainID, + map[int64]*types.SignedHeader{ + // trusted header + 1: header, + // interim header (3/3 signed) + 2: keys.GenSignedHeader(chainID, 2, bTime.Add(30*time.Minute), nil, vals, vals, + []byte("app_hash"), []byte("cons_hash"), []byte("results_hash"), 0, len(keys)), + // last header (3/3 signed) + 3: keys.GenSignedHeader(chainID, 3, bTime.Add(1*time.Hour), nil, vals, vals, + []byte("app_hash"), []byte("cons_hash"), []byte("results_hash"), 0, len(keys)), + }, + map[int64]*types.ValidatorSet{ + 1: vals, + 2: vals, + 3: vals, + 4: vals, + }, + ) + ) + + c, err := NewClient( + chainID, + TrustOptions{ + Period: 4 * time.Hour, + Height: 1, + Hash: header.Hash(), + }, + primary, + []provider.Provider{witness}, + dbs.New(dbm.NewMemDB(), chainID), + UpdatePeriod(0), + Logger(log.TestingLogger()), + ) + require.NoError(t, err) + err = c.Start() + require.NoError(t, err) + defer c.Stop() + assert.NotEqual(t, c.Primary(), primary) + assert.Equal(t, 0, len(c.Witnesses())) + +} diff --git a/lite2/provider/mock/mock.go b/lite2/provider/mock/mock.go index f895420e5..f41358345 100644 --- a/lite2/provider/mock/mock.go +++ b/lite2/provider/mock/mock.go @@ -1,19 +1,20 @@ package mock import ( + "github.com/pkg/errors" + "github.com/tendermint/tendermint/lite2/provider" "github.com/tendermint/tendermint/types" ) -// mock provider allows to directly set headers & vals, which can be handy when -// testing. type mock struct { chainID string headers map[int64]*types.SignedHeader vals map[int64]*types.ValidatorSet } -// New creates a mock provider. +// New creates a mock provider with the given set of headers and validator +// sets. func New(chainID string, headers map[int64]*types.SignedHeader, vals map[int64]*types.ValidatorSet) provider.Provider { return &mock{ chainID: chainID, @@ -45,3 +46,24 @@ func (p *mock) ValidatorSet(height int64) (*types.ValidatorSet, error) { } return nil, provider.ErrValidatorSetNotFound } + +type deadMock struct { + chainID string +} + +// NewDeadMock creates a mock provider that always errors. +func NewDeadMock(chainID string) provider.Provider { + return &deadMock{chainID: chainID} +} + +func (p *deadMock) ChainID() string { + return p.chainID +} + +func (p *deadMock) SignedHeader(height int64) (*types.SignedHeader, error) { + return nil, errors.New("no response from provider") +} + +func (p *deadMock) ValidatorSet(height int64) (*types.ValidatorSet, error) { + return nil, errors.New("no response from provider") +}