diff --git a/internal/rest/client.go b/internal/rest/client.go index 74df8872c..047fbedf9 100644 --- a/internal/rest/client.go +++ b/internal/rest/client.go @@ -75,9 +75,9 @@ type Client struct { // is online or offline. HealthCheckFn func() bool - // HealthCheckInterval will be the duration between re-connection attempts - // when a call has failed with a network error. - HealthCheckInterval time.Duration + // HealthCheckRetryUnit will be used to calculate the exponential + // backoff when trying to reconnect to an offline node + HealthCheckReconnectUnit time.Duration // HealthCheckTimeout determines timeout for each call. HealthCheckTimeout time.Duration @@ -309,14 +309,14 @@ func NewClient(url *url.URL, tr http.RoundTripper, newAuthToken func(aud string) // Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper // except custom DialContext and TLSClientConfig. return &Client{ - httpClient: &http.Client{Transport: tr}, - url: url, - newAuthToken: newAuthToken, - connected: online, - lastConn: time.Now().UnixNano(), - MaxErrResponseSize: 4096, - HealthCheckInterval: 200 * time.Millisecond, - HealthCheckTimeout: time.Second, + httpClient: &http.Client{Transport: tr}, + url: url, + newAuthToken: newAuthToken, + connected: online, + lastConn: time.Now().UnixNano(), + MaxErrResponseSize: 4096, + HealthCheckReconnectUnit: 200 * time.Millisecond, + HealthCheckTimeout: time.Second, } } @@ -337,6 +337,28 @@ func (c *Client) LastError() error { return c.lastErr } +// computes the exponential backoff duration according to +// https://www.awsarchitectureblog.com/2015/03/backoff.html +func exponentialBackoffWait(r *rand.Rand, unit, cap time.Duration) func(uint) time.Duration { + if unit > time.Hour { + // Protect against integer overflow + panic("unit cannot exceed one hour") + } + return func(attempt uint) time.Duration { + if attempt > 16 { + // Protect against integer overflow + attempt = 16 + } + // sleep = random_between(unit, min(cap, base * 2 ** attempt)) + sleep := unit * time.Duration(1< cap { + sleep = cap + } + sleep -= time.Duration(r.Float64() * float64(sleep-unit)) + return sleep + } +} + // MarkOffline - will mark a client as being offline and spawns // a goroutine that will attempt to reconnect if HealthCheckFn is set. // returns true if the node changed state from online to offline @@ -347,8 +369,14 @@ func (c *Client) MarkOffline(err error) bool { // Start goroutine that will attempt to reconnect. // If server is already trying to reconnect this will have no effect. if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) { - r := rand.New(rand.NewSource(time.Now().UnixNano())) go func() { + backOff := exponentialBackoffWait( + rand.New(rand.NewSource(time.Now().UnixNano())), + 200*time.Millisecond, + 30*time.Second, + ) + + attempt := uint(0) for { if atomic.LoadInt32(&c.connected) == closed { return @@ -362,7 +390,8 @@ func (c *Client) MarkOffline(err error) bool { } return } - time.Sleep(time.Duration(r.Float64() * float64(c.HealthCheckInterval))) + attempt++ + time.Sleep(backOff(attempt)) } }() return true