From ca69da7533c06d2fae059a49a80a1c2b25425a86 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 1 Mar 2022 18:59:29 -0500 Subject: [PATCH] statesync: avoid compounding retry logic for fetching consensus parameters (backport #8032) (#8041) (cherry picked from commit a965f03c15df44c627e6a13b0bf86b8853888290) --- internal/statesync/reactor_test.go | 2 +- internal/statesync/stateprovider.go | 156 +++++++++++++++++----------- 2 files changed, 96 insertions(+), 62 deletions(-) diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index 759869f42..c55212e5d 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -560,7 +560,7 @@ func TestReactor_StateProviderP2P(t *testing.T) { require.NoError(t, err) rts.reactor.syncer.stateProvider = rts.reactor.stateProvider - actx, cancel := context.WithTimeout(bctx, 10*time.Second) + actx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() appHash, err := rts.reactor.stateProvider.AppHash(actx, 5) diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index 83a95981c..77be72e39 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -3,9 +3,10 @@ package statesync import ( "bytes" "context" - "errors" "fmt" + "math/rand" "strings" + "sync" "time" dbm "github.com/tendermint/tm-db" @@ -328,7 +329,7 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State, // We'll also need to fetch consensus params via P2P. state.ConsensusParams, err = s.consensusParams(ctx, currentLightBlock.Height) if err != nil { - return sm.State{}, err + return sm.State{}, fmt.Errorf("fetching consensus params: %w", err) } // validate the consensus params if !bytes.Equal(nextLightBlock.ConsensusHash, state.ConsensusParams.HashConsensusParams()) { @@ -352,73 +353,106 @@ func (s *stateProviderP2P) addProvider(p lightprovider.Provider) { // consensusParams sends out a request for consensus params blocking // until one is returned. // -// If it fails to get a valid set of consensus params from any of the -// providers it returns an error; however, it will retry indefinitely -// (with backoff) until the context is canceled. +// It attempts to send requests to all witnesses in parallel, but if +// none responds it will retry them all sometime later until it +// receives some response. This operation will block until it receives +// a response or the context is canceled. func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (types.ConsensusParams, error) { - iterCount := 0 + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + out := make(chan types.ConsensusParams) + + retryAll := func() (<-chan struct{}, error) { + wg := &sync.WaitGroup{} + + for _, provider := range s.lc.Witnesses() { + p, ok := provider.(*BlockProvider) + if !ok { + return nil, fmt.Errorf("witness is not BlockProvider [%T]", provider) + } + + peer, err := types.NewNodeID(p.String()) + if err != nil { + return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err) + } + + wg.Add(1) + go func(p *BlockProvider, peer types.NodeID, requestCh chan<- p2p.Envelope, responseCh <-chan types.ConsensusParams) { + defer wg.Done() + + timer := time.NewTimer(0) + defer timer.Stop() + var iterCount int64 + + for { + iterCount++ + select { + case s.paramsSendCh <- p2p.Envelope{ + To: peer, + Message: &ssproto.ParamsRequest{ + Height: uint64(height), + }, + }: + case <-ctx.Done(): + return + } + + // jitter+backoff the retry loop + timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout + + time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec + + select { + case <-timer.C: + continue + case <-ctx.Done(): + return + case params, ok := <-responseCh: + if !ok { + return + } + select { + case <-ctx.Done(): + return + case out <- params: + return + } + } + } + + }(p, peer, s.paramsSendCh, s.paramsRecvCh) + } + sig := make(chan struct{}) + go func() { wg.Wait(); close(sig) }() + return sig, nil + } + + timer := time.NewTimer(0) + defer timer.Stop() + + var iterCount int64 for { - params, err := s.tryGetConsensusParamsFromWitnesses(ctx, height) + iterCount++ + sig, err := retryAll() if err != nil { return types.ConsensusParams{}, err } - if params != nil { - return *params, nil - } - iterCount++ - select { + case <-sig: + // jitter+backoff the retry loop + timer.Reset(time.Duration(iterCount)*consensusParamsResponseTimeout + + time.Duration(100*rand.Int63n(iterCount))*time.Millisecond) // nolint:gosec + select { + case param := <-out: + return param, nil + case <-ctx.Done(): + return types.ConsensusParams{}, ctx.Err() + case <-timer.C: + } case <-ctx.Done(): return types.ConsensusParams{}, ctx.Err() - case <-time.After(time.Duration(iterCount) * consensusParamsResponseTimeout): + case param := <-out: + return param, nil } } } - -// tryGetConsensusParamsFromWitnesses attempts to get consensus -// parameters from the light clients available witnesses. If both -// return parameters are nil, then it can be retried. -func (s *stateProviderP2P) tryGetConsensusParamsFromWitnesses( - ctx context.Context, - height int64, -) (*types.ConsensusParams, error) { - for _, provider := range s.lc.Witnesses() { - p, ok := provider.(*BlockProvider) - if !ok { - panic("expected p2p state provider to use p2p block providers") - } - - // extract the nodeID of the provider - peer, err := types.NewNodeID(p.String()) - if err != nil { - return nil, fmt.Errorf("invalid provider (%s) node id: %w", p.String(), err) - } - - select { - case s.paramsSendCh <- p2p.Envelope{ - To: peer, - Message: &ssproto.ParamsRequest{ - Height: uint64(height), - }, - }: - case <-ctx.Done(): - return nil, ctx.Err() - } - - select { - // if we get no response from this provider we move on to the next one - case <-time.After(consensusParamsResponseTimeout): - continue - case <-ctx.Done(): - return nil, ctx.Err() - case params, ok := <-s.paramsRecvCh: - if !ok { - return nil, errors.New("params channel closed") - } - return ¶ms, nil - } - } - - // signal to caller to retry. - return nil, nil -}