diff --git a/pkg/madmin/api-error-response.go b/pkg/madmin/api-error-response.go index e65d9217e..c328c33f9 100644 --- a/pkg/madmin/api-error-response.go +++ b/pkg/madmin/api-error-response.go @@ -77,6 +77,28 @@ func httpRespToErrorResponse(resp *http.Response) error { return errResp } +// ToErrorResponse - Returns parsed ErrorResponse struct from body and +// http headers. +// +// For example: +// +// import admin "github.com/minio/minio/pkg/madmin" +// ... +// ... +// ss, err := adm.ServiceStatus(...) +// if err != nil { +// resp := admin.ToErrorResponse(err) +// } +// ... +func ToErrorResponse(err error) ErrorResponse { + switch err := err.(type) { + case ErrorResponse: + return err + default: + return ErrorResponse{} + } +} + // ErrInvalidArgument - Invalid argument response. func ErrInvalidArgument(message string) error { return ErrorResponse{ diff --git a/pkg/madmin/api.go b/pkg/madmin/api.go index 7e601ef7f..3c9189548 100644 --- a/pkg/madmin/api.go +++ b/pkg/madmin/api.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "net/http" "net/http/httputil" "net/url" @@ -29,6 +30,7 @@ import ( "regexp" "runtime" "strings" + "time" "github.com/minio/minio-go/pkg/s3signer" "github.com/minio/minio-go/pkg/s3utils" @@ -57,6 +59,8 @@ type AdminClient struct { // Needs allocation. httpClient *http.Client + random *rand.Rand + // Advanced functionality. isTraceEnabled bool traceOutput io.Writer @@ -107,6 +111,8 @@ func privateNew(endpoint, accessKeyID, secretAccessKey string, secure bool) (*Ad httpClient: &http.Client{ Transport: http.DefaultTransport, }, + // Introduce a new locked random seed. + random: rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())}), } // Return. @@ -315,6 +321,7 @@ var successStatus = []int{ // request upon any error up to maxRetries attempts in a binomially // delayed manner using a standard back off algorithm. func (adm AdminClient) executeMethod(method string, reqData requestData) (res *http.Response, err error) { + var reqRetry = MaxRetry // Indicates how many times we can retry the request // Create a done channel to control 'ListObjects' go routine. doneCh := make(chan struct{}, 1) @@ -322,39 +329,63 @@ func (adm AdminClient) executeMethod(method string, reqData requestData) (res *h // Indicate to our routine to exit cleanly upon return. defer close(doneCh) - // Instantiate a new request. - var req *http.Request - req, err = adm.newRequest(method, reqData) - if err != nil { - return nil, err - } - - // Initiate the request. - res, err = adm.do(req) - if err != nil { - return nil, err - } - - // For any known successful http status, return quickly. - for _, httpStatus := range successStatus { - if httpStatus == res.StatusCode { - return res, nil + for range adm.newRetryTimer(reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter, doneCh) { + // Instantiate a new request. + var req *http.Request + req, err = adm.newRequest(method, reqData) + if err != nil { + return nil, err } + + // Initiate the request. + res, err = adm.do(req) + if err != nil { + // For supported http requests errors verify. + if isHTTPReqErrorRetryable(err) { + continue // Retry. + } + // For other errors, return here no need to retry. + return nil, err + } + + // For any known successful http status, return quickly. + for _, httpStatus := range successStatus { + if httpStatus == res.StatusCode { + return res, nil + } + } + + // Read the body to be saved later. + errBodyBytes, err := ioutil.ReadAll(res.Body) + // res.Body should be closed + closeResponse(res) + if err != nil { + return nil, err + } + + // Save the body. + errBodySeeker := bytes.NewReader(errBodyBytes) + res.Body = ioutil.NopCloser(errBodySeeker) + + // For errors verify if its retryable otherwise fail quickly. + errResponse := ToErrorResponse(httpRespToErrorResponse(res)) + + // Save the body back again. + errBodySeeker.Seek(0, 0) // Seek back to starting point. + res.Body = ioutil.NopCloser(errBodySeeker) + + // Verify if error response code is retryable. + if isS3CodeRetryable(errResponse.Code) { + continue // Retry. + } + + // Verify if http status code is retryable. + if isHTTPStatusRetryable(res.StatusCode) { + continue // Retry. + } + + break } - - // Read the body to be saved later. - errBodyBytes, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, err - } - // Save the body. - errBodySeeker := bytes.NewReader(errBodyBytes) - res.Body = ioutil.NopCloser(errBodySeeker) - - // Save the body back again. - errBodySeeker.Seek(0, 0) // Seek back to starting point. - res.Body = ioutil.NopCloser(errBodySeeker) - return res, err } diff --git a/pkg/madmin/retry.go b/pkg/madmin/retry.go new file mode 100644 index 000000000..8ad347ab2 --- /dev/null +++ b/pkg/madmin/retry.go @@ -0,0 +1,175 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package madmin + +import ( + "math/rand" + "net" + "net/http" + "net/url" + "strings" + "sync" + "time" +) + +// MaxRetry is the maximum number of retries before stopping. +var MaxRetry = 10 + +// MaxJitter will randomize over the full exponential backoff time +const MaxJitter = 1.0 + +// NoJitter disables the use of jitter for randomizing the exponential backoff time +const NoJitter = 0.0 + +// DefaultRetryUnit - default unit multiplicative per retry. +// defaults to 1 second. +const DefaultRetryUnit = time.Second + +// DefaultRetryCap - Each retry attempt never waits no longer than +// this maximum time duration. +const DefaultRetryCap = time.Second * 30 + +// lockedRandSource provides protected rand source, implements rand.Source interface. +type lockedRandSource struct { + lk sync.Mutex + src rand.Source +} + +// Int63 returns a non-negative pseudo-random 63-bit integer as an int64. +func (r *lockedRandSource) Int63() (n int64) { + r.lk.Lock() + n = r.src.Int63() + r.lk.Unlock() + return +} + +// Seed uses the provided seed value to initialize the generator to a +// deterministic state. +func (r *lockedRandSource) Seed(seed int64) { + r.lk.Lock() + r.src.Seed(seed) + r.lk.Unlock() +} + +// newRetryTimer creates a timer with exponentially increasing +// delays until the maximum retry attempts are reached. +func (adm AdminClient) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { + attemptCh := make(chan int) + + // computes the exponential backoff duration according to + // https://www.awsarchitectureblog.com/2015/03/backoff.html + exponentialBackoffWait := func(attempt int) time.Duration { + // normalize jitter to the range [0, 1.0] + if jitter < NoJitter { + jitter = NoJitter + } + if jitter > MaxJitter { + jitter = MaxJitter + } + + //sleep = random_between(0, min(cap, base * 2 ** attempt)) + sleep := unit * time.Duration(1< cap { + sleep = cap + } + if jitter != NoJitter { + sleep -= time.Duration(adm.random.Float64() * float64(sleep) * jitter) + } + return sleep + } + + go func() { + defer close(attemptCh) + for i := 0; i < maxRetry; i++ { + select { + // Attempts start from 1. + case attemptCh <- i + 1: + case <-doneCh: + // Stop the routine. + return + } + time.Sleep(exponentialBackoffWait(i)) + } + }() + return attemptCh +} + +// isHTTPReqErrorRetryable - is http requests error retryable, such +// as i/o timeout, connection broken etc.. +func isHTTPReqErrorRetryable(err error) bool { + if err == nil { + return false + } + switch e := err.(type) { + case *url.Error: + switch e.Err.(type) { + case *net.DNSError, *net.OpError, net.UnknownNetworkError: + return true + } + if strings.Contains(err.Error(), "Connection closed by foreign host") { + return true + } else if strings.Contains(err.Error(), "net/http: TLS handshake timeout") { + // If error is - tlsHandshakeTimeoutError, retry. + return true + } else if strings.Contains(err.Error(), "i/o timeout") { + // If error is - tcp timeoutError, retry. + return true + } else if strings.Contains(err.Error(), "connection timed out") { + // If err is a net.Dial timeout, retry. + return true + } else if strings.Contains(err.Error(), "net/http: HTTP/1.x transport connection broken") { + // If error is transport connection broken, retry. + return true + } + } + return false +} + +// List of AWS S3 error codes which are retryable. +var retryableS3Codes = map[string]struct{}{ + "RequestError": {}, + "RequestTimeout": {}, + "Throttling": {}, + "ThrottlingException": {}, + "RequestLimitExceeded": {}, + "RequestThrottled": {}, + "InternalError": {}, + "SlowDown": {}, + // Add more AWS S3 codes here. +} + +// isS3CodeRetryable - is s3 error code retryable. +func isS3CodeRetryable(s3Code string) (ok bool) { + _, ok = retryableS3Codes[s3Code] + return ok +} + +// List of HTTP status codes which are retryable. +var retryableHTTPStatusCodes = map[int]struct{}{ + http.StatusTooManyRequests: {}, + http.StatusInternalServerError: {}, + http.StatusBadGateway: {}, + http.StatusServiceUnavailable: {}, + // Add more HTTP status codes here. +} + +// isHTTPStatusRetryable - is HTTP error code retryable. +func isHTTPStatusRetryable(httpStatusCode int) (ok bool) { + _, ok = retryableHTTPStatusCodes[httpStatusCode] + return ok +}