From 961f7dea820d66a4f2a40561addb4a6e92b5ba42 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 22 Jan 2024 12:16:36 -0800 Subject: [PATCH] compress binary while sending it to all the nodes (#18837) Also limit the amount of concurrency when sending binary updates to peers, avoid high network over TX that can cause disconnection events for the node sending updates. --- cmd/admin-handlers.go | 4 ++-- cmd/notification.go | 38 ++++++++++++++++++++++++++++++++------ cmd/peer-rest-server.go | 10 +++++++++- cmd/update.go | 30 ++++++++++++++++++++++++++---- 4 files changed, 69 insertions(+), 13 deletions(-) diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 46c546928..b8f6a8aaa 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -149,7 +149,7 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req } // Download Binary Once - bin, err := downloadBinary(u, mode) + binC, bin, err := downloadBinary(u, mode) if err != nil { logger.LogIf(ctx, fmt.Errorf("server update failed with %w", err)) writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) @@ -157,7 +157,7 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req } // Push binary to other servers - for _, nerr := range globalNotificationSys.VerifyBinary(ctx, u, sha256Sum, releaseInfo, bin) { + for _, nerr := range globalNotificationSys.VerifyBinary(ctx, u, sha256Sum, releaseInfo, binC) { if nerr.Err != nil { err := AdminError{ Code: AdminUpdateApplyFailure, diff --git a/cmd/notification.go b/cmd/notification.go index bd3e430f0..1fea03dd8 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -26,6 +26,7 @@ import ( "math/rand" "net/http" "net/url" + "runtime" "sync" "time" @@ -34,6 +35,7 @@ import ( "github.com/minio/madmin-go/v3" xnet "github.com/minio/pkg/v2/net" "github.com/minio/pkg/v2/sync/errgroup" + "github.com/minio/pkg/v2/workers" "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/logger" @@ -59,7 +61,7 @@ type NotificationPeerErr struct { // // A zero NotificationGroup is valid and does not cancel on error. type NotificationGroup struct { - wg sync.WaitGroup + workers *workers.Workers errs []NotificationPeerErr retryCount int } @@ -67,7 +69,11 @@ type NotificationGroup struct { // WithNPeers returns a new NotificationGroup with length of errs slice upto nerrs, // upon Wait() errors are returned collected from all tasks. func WithNPeers(nerrs int) *NotificationGroup { - return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), retryCount: 3} + if nerrs <= 0 { + nerrs = 1 + } + wk, _ := workers.New(nerrs) + return &NotificationGroup{errs: make([]NotificationPeerErr, nerrs), workers: wk, retryCount: 3} } // WithRetries sets the retry count for all function calls from the Go method. @@ -81,7 +87,7 @@ func (g *NotificationGroup) WithRetries(retryCount int) *NotificationGroup { // Wait blocks until all function calls from the Go method have returned, then // returns the slice of errors from all function calls. func (g *NotificationGroup) Wait() []NotificationPeerErr { - g.wg.Wait() + g.workers.Wait() return g.errs } @@ -92,10 +98,11 @@ func (g *NotificationGroup) Wait() []NotificationPeerErr { func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, addr xnet.Host) { r := rand.New(rand.NewSource(time.Now().UnixNano())) - g.wg.Add(1) + g.workers.Take() go func() { - defer g.wg.Done() + defer g.workers.Give() + g.errs[index] = NotificationPeerErr{ Host: addr, } @@ -340,7 +347,26 @@ func (sys *NotificationSys) DownloadProfilingData(ctx context.Context, writer io // VerifyBinary - asks remote peers to verify the checksum func (sys *NotificationSys) VerifyBinary(ctx context.Context, u *url.URL, sha256Sum []byte, releaseInfo string, bin []byte) []NotificationPeerErr { - ng := WithNPeers(len(sys.peerClients)) + // FIXME: network calls made in this manner such as one goroutine per node, + // can easily eat into the internode bandwidth. This function would be mostly + // TX saturating, however there are situations where a RX might also saturate. + // To avoid these problems we must split the work at scale. With 1000 node + // setup becoming a reality we must try to shard the work properly such as + // pick 10 nodes that precisely can send those 100 requests the first node + // in the 10 node shard would coordinate between other 9 shards to get the + // rest of the `99*9` requests. + // + // This essentially splits the workload properly and also allows for network + // utilization to be optimal, instead of blindly throttling the way we are + // doing below. However the changes that are needed here are a bit involved, + // further discussion advised. Remove this comment and remove the worker model + // for this function in future. + maxWorkers := runtime.GOMAXPROCS(0) / 2 + if maxWorkers > len(sys.peerClients) { + maxWorkers = len(sys.peerClients) + } + + ng := WithNPeers(maxWorkers) for idx, client := range sys.peerClients { if client == nil { continue diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 26a561f52..0344053c4 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -33,6 +33,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/klauspost/compress/zstd" "github.com/minio/madmin-go/v3" b "github.com/minio/minio/internal/bucket/bandwidth" "github.com/minio/minio/internal/event" @@ -850,7 +851,14 @@ func (s *peerRESTServer) VerifyBinaryHandler(w http.ResponseWriter, r *http.Requ } releaseInfo := r.Form.Get(peerRESTReleaseInfo) - if err = verifyBinary(u, sha256Sum, releaseInfo, getMinioMode(), r.Body); err != nil { + zr, err := zstd.NewReader(r.Body) + if err != nil { + s.writeErrorResponse(w, err) + return + } + defer zr.Close() + + if err = verifyBinary(u, sha256Sum, releaseInfo, getMinioMode(), zr); err != nil { s.writeErrorResponse(w, err) return } diff --git a/cmd/update.go b/cmd/update.go index 45191c03f..313db811e 100644 --- a/cmd/update.go +++ b/cmd/update.go @@ -35,12 +35,14 @@ import ( "sync/atomic" "time" + "github.com/klauspost/compress/zstd" xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" "github.com/minio/pkg/v2/env" xnet "github.com/minio/pkg/v2/net" "github.com/minio/selfupdate" gopsutilcpu "github.com/shirou/gopsutil/v3/cpu" + "github.com/valyala/bytebufferpool" ) const ( @@ -510,20 +512,40 @@ func getUpdateReaderFromURL(u *url.URL, transport http.RoundTripper, mode string var updateInProgress atomic.Uint32 // Function to get the reader from an architecture -func downloadBinary(u *url.URL, mode string) (bin []byte, err error) { +func downloadBinary(u *url.URL, mode string) (binCompressed []byte, bin []byte, err error) { transport := getUpdateTransport(30 * time.Second) var reader io.ReadCloser if u.Scheme == "https" || u.Scheme == "http" { reader, err = getUpdateReaderFromURL(u, transport, mode) if err != nil { - return nil, err + return nil, nil, err } } else { - return nil, fmt.Errorf("unsupported protocol scheme: %s", u.Scheme) + return nil, nil, fmt.Errorf("unsupported protocol scheme: %s", u.Scheme) } defer xhttp.DrainBody(reader) - return io.ReadAll(reader) + b := bytebufferpool.Get() + bc := bytebufferpool.Get() + defer func() { + b.Reset() + bc.Reset() + + bytebufferpool.Put(b) + bytebufferpool.Put(bc) + }() + + w, err := zstd.NewWriter(bc) + if err != nil { + return nil, nil, err + } + + if _, err = io.Copy(w, io.TeeReader(reader, b)); err != nil { + return nil, nil, err + } + + w.Close() + return bc.Bytes(), b.Bytes(), nil } const (