From 2dbb1cff4ab2eec87bd46378716f6dd6ba2d22ab Mon Sep 17 00:00:00 2001 From: jiuker <2818723467@qq.com> Date: Thu, 6 Jul 2023 13:28:26 +0800 Subject: [PATCH] feat: support perf site replication (#17477) --- cmd/admin-handlers-site-replication.go | 48 +++++++++++++- cmd/admin-handlers.go | 43 +++++++++++++ cmd/admin-router.go | 11 +++- cmd/globals.go | 1 + cmd/perf-tests.go | 86 +++++++++++++++++++++++++- cmd/site-replication.go | 80 ++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- 8 files changed, 266 insertions(+), 9 deletions(-) diff --git a/cmd/admin-handlers-site-replication.go b/cmd/admin-handlers-site-replication.go index 5e9a2973c..0712452cc 100644 --- a/cmd/admin-handlers-site-replication.go +++ b/cmd/admin-handlers-site-replication.go @@ -20,16 +20,19 @@ package cmd import ( "bytes" "context" + "encoding/gob" "encoding/json" + "errors" "io" "net/http" "strings" + "sync/atomic" "time" + "github.com/dustin/go-humanize" "github.com/minio/madmin-go/v3" - "github.com/minio/mux" - "github.com/minio/minio/internal/logger" + "github.com/minio/mux" "github.com/minio/pkg/bucket/policy" iampolicy "github.com/minio/pkg/iam/policy" ) @@ -543,3 +546,44 @@ func (a adminAPIHandlers) SiteReplicationResyncOp(w http.ResponseWriter, r *http } writeSuccessResponseJSON(w, body) } + +// SiteReplicationDevNull - everything goes to io.Discard +// [POST] /minio/admin/v3/site-replication/devnull +func (a *adminAPIHandlers) SiteReplicationDevNull(w http.ResponseWriter, r *http.Request) { + globalSiteNetPerfRX.Connect() + defer globalSiteNetPerfRX.Disconnect() + + connectTime := time.Now() + ctx := newContext(r, w, "SiteReplicationDevNull") + for { + n, err := io.CopyN(io.Discard, r.Body, 128*humanize.KiByte) + atomic.AddUint64(&globalSiteNetPerfRX.RX, uint64(n)) + if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { + // If there is a disconnection before globalNetPerfMinDuration (we give a margin of error of 1 sec) + // would mean the network is not stable. Logging here will help in debugging network issues. + if time.Since(connectTime) < (globalNetPerfMinDuration - time.Second) { + logger.LogIf(ctx, err) + } + } + if err != nil { + if errors.Is(err, io.EOF) { + w.WriteHeader(http.StatusNoContent) + } else { + w.WriteHeader(http.StatusBadRequest) + } + break + } + } +} + +// SiteReplicationNetPerf - everything goes to io.Discard +// [POST] /minio/admin/v3/site-replication/netperf +func (a *adminAPIHandlers) SiteReplicationNetPerf(w http.ResponseWriter, r *http.Request) { + durationStr := r.Form.Get(peerRESTDuration) + duration, _ := time.ParseDuration(durationStr) + if duration < globalNetPerfMinDuration { + duration = globalNetPerfMinDuration + } + result := siteNetperf(r.Context(), duration) + logger.LogIf(r.Context(), gob.NewEncoder(w).Encode(result)) +} diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 2f66a73d6..d0bdd36a6 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1153,6 +1153,49 @@ func (a adminAPIHandlers) BackgroundHealStatusHandler(w http.ResponseWriter, r * } } +// SitePerfHandler - measures network throughput between site replicated setups +func (a adminAPIHandlers) SitePerfHandler(w http.ResponseWriter, r *http.Request) { + ctx := newContext(r, w, "SitePerfHandler") + + defer logger.AuditLog(ctx, w, r, mustGetClaimsFromToken(r)) + + objectAPI, _ := validateAdminReq(ctx, w, r, iampolicy.HealthInfoAdminAction) + if objectAPI == nil { + return + } + + nsLock := objectAPI.NewNSLock(minioMetaBucket, "site-net-perf") + lkctx, err := nsLock.GetLock(ctx, globalOperationTimeout) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(toAPIErrorCode(ctx, err)), r.URL) + return + } + defer nsLock.Unlock(lkctx) + + durationStr := r.Form.Get(peerRESTDuration) + duration, err := time.ParseDuration(durationStr) + if err != nil { + duration = globalNetPerfMinDuration + } + + if duration < globalNetPerfMinDuration { + // We need sample size of minimum 10 secs. + duration = globalNetPerfMinDuration + } + + duration = duration.Round(time.Second) + + results, err := globalSiteReplicationSys.Netperf(ctx, duration) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(toAPIErrorCode(ctx, err)), r.URL) + return + } + enc := json.NewEncoder(w) + if err := enc.Encode(results); err != nil { + return + } +} + // NetperfHandler - perform mesh style network throughput test func (a adminAPIHandlers) NetperfHandler(w http.ResponseWriter, r *http.Request) { ctx := newContext(r, w, "NetperfHandler") diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 4def3756d..cc2691d90 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -28,9 +28,11 @@ import ( ) const ( - adminPathPrefix = minioReservedBucketPath + "/admin" - adminAPIVersion = madmin.AdminAPIVersion - adminAPIVersionPrefix = SlashSeparator + adminAPIVersion + adminPathPrefix = minioReservedBucketPath + "/admin" + adminAPIVersion = madmin.AdminAPIVersion + adminAPIVersionPrefix = SlashSeparator + adminAPIVersion + adminAPISiteReplicationDevNull = "/site-replication/devnull" + adminAPISiteReplicationNetPerf = "/site-replication/netperf" ) // adminAPIHandlers provides HTTP handlers for MinIO admin API. @@ -260,6 +262,8 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/info").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationInfo))) adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/metainfo").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationMetaInfo))) adminRouter.Methods(http.MethodGet).Path(adminVersion + "/site-replication/status").HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationStatus))) + adminRouter.Methods(http.MethodPost).Path(adminVersion + adminAPISiteReplicationDevNull).HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationDevNull))) + adminRouter.Methods(http.MethodPost).Path(adminVersion + adminAPISiteReplicationNetPerf).HandlerFunc(gz(httpTraceHdrs(adminAPI.SiteReplicationNetPerf))) adminRouter.Methods(http.MethodPut).Path(adminVersion + "/site-replication/peer/join").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerJoin))) adminRouter.Methods(http.MethodPut).Path(adminVersion+"/site-replication/peer/bucket-ops").HandlerFunc(gz(httpTraceHdrs(adminAPI.SRPeerBucketOps))).Queries("bucket", "{bucket:.*}").Queries("operation", "{operation:.*}") @@ -283,6 +287,7 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/object").HandlerFunc(httpTraceHdrs(adminAPI.ObjectSpeedTestHandler)) adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/drive").HandlerFunc(httpTraceHdrs(adminAPI.DriveSpeedtestHandler)) adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/net").HandlerFunc(httpTraceHdrs(adminAPI.NetperfHandler)) + adminRouter.Methods(http.MethodPost).Path(adminVersion + "/speedtest/site").HandlerFunc(httpTraceHdrs(adminAPI.SitePerfHandler)) // HTTP Trace adminRouter.Methods(http.MethodGet).Path(adminVersion + "/trace").HandlerFunc(gz(http.HandlerFunc(adminAPI.TraceHandler))) diff --git a/cmd/globals.go b/cmd/globals.go index 45f146a7b..040326b9c 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -370,6 +370,7 @@ var ( // Used for collecting stats for netperf globalNetPerfMinDuration = time.Second * 10 globalNetPerfRX netPerfRX + globalSiteNetPerfRX netPerfRX globalObjectPerfBucket = "minio-perf-test-tmp-bucket" globalObjectPerfUserMetadata = "X-Amz-Meta-Minio-Object-Perf" // Clients can set this to bypass S3 API service freeze. Used by object pref tests. diff --git a/cmd/perf-tests.go b/cmd/perf-tests.go index 8b6bb89fb..b28e6e7ee 100644 --- a/cmd/perf-tests.go +++ b/cmd/perf-tests.go @@ -24,6 +24,7 @@ import ( "io" "math/rand" "net/http" + "net/url" "sync" "sync/atomic" "time" @@ -31,6 +32,7 @@ import ( "github.com/dustin/go-humanize" "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7" + xhttp "github.com/minio/minio/internal/http" "github.com/minio/pkg/randreader" ) @@ -240,7 +242,7 @@ func (n *netPerfRX) Connect() { n.Lock() defer n.Unlock() n.activeConnections++ - atomic.StoreUint64(&globalNetPerfRX.RX, 0) + atomic.StoreUint64(&n.RX, 0) n.lastToConnect = time.Now() } @@ -338,3 +340,85 @@ func netperf(ctx context.Context, duration time.Duration) madmin.NetperfNodeResu globalNetPerfRX.Reset() return madmin.NetperfNodeResult{Endpoint: "", TX: r.n / uint64(duration.Seconds()), RX: uint64(rx / delta.Seconds()), Error: errStr} } + +func siteNetperf(ctx context.Context, duration time.Duration) madmin.SiteNetPerfNodeResult { + r := &netperfReader{eof: make(chan struct{})} + r.buf = make([]byte, 128*humanize.KiByte) + rand.Read(r.buf) + + clusterInfos, err := globalSiteReplicationSys.GetClusterInfo(ctx) + if err != nil { + return madmin.SiteNetPerfNodeResult{Error: err.Error()} + } + + // Scale the number of connections from 32 -> 4 from small to large clusters. + connectionsPerPeer := 3 + (29+len(clusterInfos.Sites)-1)/len(clusterInfos.Sites) + + errStr := "" + var wg sync.WaitGroup + + for _, info := range clusterInfos.Sites { + // skip self + if globalDeploymentID == info.DeploymentID { + continue + } + info := info + wg.Add(connectionsPerPeer) + for i := 0; i < connectionsPerPeer; i++ { + go func() { + defer wg.Done() + cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID) + if err != nil { + return + } + rp := cli.GetEndpointURL() + reqURL := &url.URL{ + Scheme: rp.Scheme, + Host: rp.Host, + Path: adminPathPrefix + adminAPIVersionPrefix + adminAPISiteReplicationDevNull, + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), r) + if err != nil { + return + } + client := &http.Client{ + Timeout: duration + 10*time.Second, + Transport: globalRemoteTargetTransport, + } + resp, err := client.Do(req) + if err != nil { + return + } + defer xhttp.DrainBody(resp.Body) + }() + } + } + + time.Sleep(duration) + close(r.eof) + wg.Wait() + for { + if globalSiteNetPerfRX.ActiveConnections() == 0 || contextCanceled(ctx) { + break + } + time.Sleep(time.Second) + } + rx := float64(globalSiteNetPerfRX.RXSample) + delta := globalSiteNetPerfRX.firstToDisconnect.Sub(globalSiteNetPerfRX.lastToConnect) + // If the first disconnected before the last connected, we likely had a network issue. + if delta <= 0 { + rx = 0 + errStr = "detected network disconnections, possibly an unstable network" + } + + globalSiteNetPerfRX.Reset() + return madmin.SiteNetPerfNodeResult{ + Endpoint: "", + TX: r.n, + TXTotalDuration: duration, + RX: uint64(rx), + RXTotalDuration: delta, + Error: errStr, + TotalConn: uint64(connectionsPerPeer), + } +} diff --git a/cmd/site-replication.go b/cmd/site-replication.go index 515945692..9c2037892 100644 --- a/cmd/site-replication.go +++ b/cmd/site-replication.go @@ -22,10 +22,12 @@ import ( "context" "encoding/base64" "encoding/binary" + "encoding/gob" "encoding/json" "encoding/xml" "errors" "fmt" + "net/http" "net/url" "reflect" "runtime" @@ -41,6 +43,7 @@ import ( "github.com/minio/minio-go/v7/pkg/set" "github.com/minio/minio/internal/auth" sreplication "github.com/minio/minio/internal/bucket/replication" + xhttp "github.com/minio/minio/internal/http" "github.com/minio/minio/internal/logger" bktpolicy "github.com/minio/pkg/bucket/policy" iampolicy "github.com/minio/pkg/iam/policy" @@ -647,6 +650,83 @@ func (c *SiteReplicationSys) validateIDPSettings(ctx context.Context, peers []Pe return true, nil } +// Netperf for site-replication net perf +func (c *SiteReplicationSys) Netperf(ctx context.Context, duration time.Duration) (results madmin.SiteNetPerfResult, err error) { + infos, err := globalSiteReplicationSys.GetClusterInfo(ctx) + if err != nil { + return results, err + } + var wg sync.WaitGroup + var resultsMu sync.RWMutex + for _, info := range infos.Sites { + info := info + // will call siteNetperf, means call others's adminAPISiteReplicationDevNull + if globalDeploymentID == info.DeploymentID { + wg.Add(1) + go func() (err error) { + defer wg.Done() + result := &madmin.SiteNetPerfNodeResult{} + defer func() { + if err != nil { + result.Error = err.Error() + } + resultsMu.Lock() + results.NodeResults = append(results.NodeResults, *result) + resultsMu.Unlock() + }() + cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID) + if err != nil { + return err + } + *result = siteNetperf(ctx, duration) + result.Endpoint = cli.GetEndpointURL().String() + return nil + }() + continue + } + wg.Add(1) + go func() (err error) { + defer wg.Done() + result := madmin.SiteNetPerfNodeResult{} + defer func() { + if err != nil { + result.Error = err.Error() + } + resultsMu.Lock() + results.NodeResults = append(results.NodeResults, result) + resultsMu.Unlock() + }() + cli, err := globalSiteReplicationSys.getAdminClient(ctx, info.DeploymentID) + if err != nil { + return err + } + rp := cli.GetEndpointURL() + reqURL := &url.URL{ + Scheme: rp.Scheme, + Host: rp.Host, + Path: adminPathPrefix + adminAPIVersionPrefix + adminAPISiteReplicationNetPerf, + } + result.Endpoint = rp.String() + req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqURL.String(), nil) + if err != nil { + return err + } + client := &http.Client{ + Timeout: duration + 10*time.Second, + Transport: globalRemoteTargetTransport, + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer xhttp.DrainBody(resp.Body) + return gob.NewDecoder(resp.Body).Decode(&result) + }() + } + wg.Wait() + return +} + // GetClusterInfo - returns site replication information. func (c *SiteReplicationSys) GetClusterInfo(ctx context.Context) (info madmin.SiteReplicationInfo, err error) { c.RLock() diff --git a/go.mod b/go.mod index 4b736a252..3a6e45261 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/minio/dperf v0.4.10 github.com/minio/highwayhash v1.0.2 github.com/minio/kes-go v0.1.0 - github.com/minio/madmin-go/v3 v3.0.4 + github.com/minio/madmin-go/v3 v3.0.5 github.com/minio/minio-go/v7 v7.0.58 github.com/minio/mux v1.9.0 github.com/minio/pkg v1.7.5 diff --git a/go.sum b/go.sum index 3ffc57077..63cbf8509 100644 --- a/go.sum +++ b/go.sum @@ -483,8 +483,8 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/minio/kes-go v0.1.0 h1:h201DyOYP5sTqajkxFGxmXz/kPbT8HQNX1uh3Yx2PFc= github.com/minio/kes-go v0.1.0/go.mod h1:VorHLaIYis9/MxAHAtXN4d8PUMNKhIxTIlvFt0hBOEo= -github.com/minio/madmin-go/v3 v3.0.4 h1:nINToRlCFRKKINGIvn+RUkYgnQregTEi6xVO6XwSUtA= -github.com/minio/madmin-go/v3 v3.0.4/go.mod h1:lPrMoc1aeiIWmmrxBthkDqzMPQwC/Lu9ByuyM2wenJk= +github.com/minio/madmin-go/v3 v3.0.5 h1:ynWTsnszHnQVJWRL2OE4ysCvCNG0uHgdTvJpdLazf9c= +github.com/minio/madmin-go/v3 v3.0.5/go.mod h1:lPrMoc1aeiIWmmrxBthkDqzMPQwC/Lu9ByuyM2wenJk= github.com/minio/mc v0.0.0-20230620210040-4b06db8e171f h1:YWTPtK2aX2YF+A6FJ9aoTXY0AvyIkDrAS7iSR03rx+Q= github.com/minio/mc v0.0.0-20230620210040-4b06db8e171f/go.mod h1:tlEqgLNEvJgXIwgpsrx7wuzmy7wNl+Q0E2ekkBf7DGk= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=