diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 4c40008cf..671ad1989 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -227,8 +227,6 @@ func (a adminAPIHandlers) ServerUpdateHandler(w http.ResponseWriter, r *http.Req // ServiceHandler - POST /minio/admin/v3/service?action={action} // ---------- // Supports following actions: -// - restart (restarts all the MinIO instances in a setup) -// - stop (stops all the MinIO instances in a setup) // - freeze (freezes all incoming S3 API calls) // - unfreeze (unfreezes previously frozen S3 API calls) func (a adminAPIHandlers) ServiceHandler(w http.ResponseWriter, r *http.Request) { @@ -238,7 +236,85 @@ func (a adminAPIHandlers) ServiceHandler(w http.ResponseWriter, r *http.Request) action := vars["action"] var serviceSig serviceSignal - switch madmin.ServiceAction(action) { + switch act := madmin.ServiceAction(action); act { + case madmin.ServiceActionFreeze: + serviceSig = serviceFreeze + case madmin.ServiceActionUnfreeze: + serviceSig = serviceUnFreeze + default: + process := act == madmin.ServiceActionRestart || act == madmin.ServiceActionStop + if process { + apiErr := errorCodes.ToAPIErr(ErrMalformedPOSTRequest) + apiErr.Description = "process actions are not supported via this API anymore, please upgrade 'mc' or madmin-go to use ServiceV2() API" + writeErrorResponseJSON(ctx, w, apiErr, r.URL) + return + } + logger.LogIf(ctx, fmt.Errorf("Unrecognized service action %s requested", action), logger.Application) + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrMalformedPOSTRequest), r.URL) + return + } + + objectAPI, _ := validateAdminReq(ctx, w, r, policy.ServiceFreezeAdminAction) + if objectAPI == nil { + return + } + + // Notify all other MinIO peers signal service. + for _, nerr := range globalNotificationSys.SignalService(serviceSig) { + if nerr.Err != nil { + logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) + logger.LogIf(ctx, nerr.Err) + } + } + + // Reply to the client before restarting, stopping MinIO server. + writeSuccessResponseHeadersOnly(w) + + switch serviceSig { + case serviceFreeze: + freezeServices() + case serviceUnFreeze: + unfreezeServices() + } +} + +type servicePeerResult struct { + Host string `json:"host"` + Err string `json:"err,omitempty"` + WaitingDrives map[string]DiskMetrics `json:"waitingDrives,omitempty"` +} + +type serviceResult struct { + Action madmin.ServiceAction `json:"action"` + Forced bool `json:"forced"` + DryRun bool `json:"dryRun"` + Results []servicePeerResult `json:"results,omitempty"` +} + +// ServiceV2Handler - POST /minio/admin/v3/service?action={action}&type=2 +// ---------- +// Supports following actions: +// - restart (restarts all the MinIO instances in a setup) +// - stop (stops all the MinIO instances in a setup) +// - freeze (freezes all incoming S3 API calls) +// - unfreeze (unfreezes previously frozen S3 API calls) +// +// This newer API now returns back status per remote peer and local regarding +// if a "restart/stop" was successful or not. Service signal now supports +// a dry-run that helps skip the nodes that may have hung drives. By default +// restart/stop will ignore the servers that are hung on drives. You can use +// 'force' param to force restart even with hung drives if needed. +func (a adminAPIHandlers) ServiceV2Handler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + vars := mux.Vars(r) + action := vars["action"] + dryRun := r.Form.Get("dry-run") == "true" + force := r.Form.Get("force") == "true" + + var serviceSig serviceSignal + act := madmin.ServiceAction(action) + switch act { case madmin.ServiceActionRestart: serviceSig = serviceRestart case madmin.ServiceActionStop: @@ -267,15 +343,57 @@ func (a adminAPIHandlers) ServiceHandler(w http.ResponseWriter, r *http.Request) } // Notify all other MinIO peers signal service. - for _, nerr := range globalNotificationSys.SignalService(serviceSig) { - if nerr.Err != nil { - logger.GetReqInfo(ctx).SetTags("peerAddress", nerr.Host.String()) - logger.LogIf(ctx, nerr.Err) + nerrs := globalNotificationSys.SignalServiceV2(serviceSig, dryRun, force) + srvResult := serviceResult{Action: act, Results: make([]servicePeerResult, 0, len(nerrs))} + + process := act == madmin.ServiceActionRestart || act == madmin.ServiceActionStop + var trigger bool + if process { + localhost := globalLocalNodeName + if globalLocalNodeName == "" { + localhost = "127.0.0.1" } + waitingDrives := canWeRestartNode() + srvResult.Results = append(srvResult.Results, servicePeerResult{ + Host: localhost, + WaitingDrives: waitingDrives, + }) + trigger = len(waitingDrives) == 0 || force + } + + for _, nerr := range nerrs { + if nerr.Err != nil && process { + waitingDrives := map[string]DiskMetrics{} + jerr := json.Unmarshal([]byte(nerr.Err.Error()), &waitingDrives) + if jerr == nil { + srvResult.Results = append(srvResult.Results, servicePeerResult{ + Host: nerr.Host.String(), + WaitingDrives: waitingDrives, + }) + continue + } + } + errStr := "" + if nerr.Err != nil { + errStr = nerr.Err.Error() + } + srvResult.Results = append(srvResult.Results, servicePeerResult{ + Host: nerr.Host.String(), + Err: errStr, + }) + } + + srvResult.Forced = force + srvResult.DryRun = dryRun + + buf, err := json.Marshal(srvResult) + if err != nil { + writeErrorResponseJSON(ctx, w, toAdminAPIErr(ctx, err), r.URL) + return } // Reply to the client before restarting, stopping MinIO server. - writeSuccessResponseHeadersOnly(w) + writeSuccessResponseJSON(w, buf) switch serviceSig { case serviceFreeze: @@ -283,7 +401,9 @@ func (a adminAPIHandlers) ServiceHandler(w http.ResponseWriter, r *http.Request) case serviceUnFreeze: unfreezeServices() case serviceRestart, serviceStop: - globalServiceSignalCh <- serviceSig + if !dryRun && trigger { + globalServiceSignalCh <- serviceSig + } } } diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index 0e56d70c6..a2b953308 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -168,6 +168,7 @@ func testServiceSignalReceiver(cmd cmdType, t *testing.T) { func getServiceCmdRequest(cmd cmdType, cred auth.Credentials) (*http.Request, error) { queryVal := url.Values{} queryVal.Set("action", string(cmd.toServiceAction())) + queryVal.Set("type", "2") resource := adminPathPrefix + adminAPIVersionPrefix + "/service?" + queryVal.Encode() req, err := newTestRequest(http.MethodPost, resource, 0, nil) if err != nil { @@ -220,12 +221,18 @@ func testServicesCmdHandler(cmd cmdType, t *testing.T) { rec := httptest.NewRecorder() adminTestBed.router.ServeHTTP(rec, req) + resp, _ := io.ReadAll(rec.Body) if rec.Code != http.StatusOK { - resp, _ := io.ReadAll(rec.Body) t.Errorf("Expected to receive %d status code but received %d. Body (%s)", http.StatusOK, rec.Code, string(resp)) } + result := &serviceResult{} + if err := json.Unmarshal(resp, result); err != nil { + t.Error(err) + } + _ = result + // Wait until testServiceSignalReceiver() called in a goroutine quits. wg.Wait() } diff --git a/cmd/admin-router.go b/cmd/admin-router.go index 7af68196f..cfbd3ae45 100644 --- a/cmd/admin-router.go +++ b/cmd/admin-router.go @@ -154,8 +154,12 @@ func registerAdminRouter(router *mux.Router, enableConfigOps bool) { } for _, adminVersion := range adminVersions { + // Restart and stop MinIO service type=2 + adminRouter.Methods(http.MethodPost).Path(adminVersion+"/service").HandlerFunc(adminMiddleware(adminAPI.ServiceV2Handler, traceAllFlag)).Queries("action", "{action:.*}", "type", "2") + // Restart and stop MinIO service. adminRouter.Methods(http.MethodPost).Path(adminVersion+"/service").HandlerFunc(adminMiddleware(adminAPI.ServiceHandler, traceAllFlag)).Queries("action", "{action:.*}") + // Update MinIO servers. adminRouter.Methods(http.MethodPost).Path(adminVersion+"/update").HandlerFunc(adminMiddleware(adminAPI.ServerUpdateHandler, traceAllFlag)).Queries("updateURL", "{updateURL:.*}") diff --git a/cmd/notification.go b/cmd/notification.go index 7a1f84f9b..37ed4d72e 100644 --- a/cmd/notification.go +++ b/cmd/notification.go @@ -106,7 +106,7 @@ func (g *NotificationGroup) Go(ctx context.Context, f func() error, index int, a if i == g.retryCount-1 { reqInfo := (&logger.ReqInfo{}).AppendTags("peerAddress", addr.String()) ctx := logger.SetReqInfo(ctx, reqInfo) - logger.LogIf(ctx, err) + logger.LogOnceIf(ctx, err, addr.String()) } // Wait for a minimum of 100ms and dynamically increase this based on number of attempts. if i < g.retryCount-1 { @@ -376,7 +376,7 @@ func (sys *NotificationSys) SignalConfigReload(subSys string) []NotificationPeer } client := client ng.Go(GlobalContext, func() error { - return client.SignalService(serviceReloadDynamic, subSys) + return client.SignalService(serviceReloadDynamic, subSys, false, true) }, idx, *client.host) } return ng.Wait() @@ -391,7 +391,23 @@ func (sys *NotificationSys) SignalService(sig serviceSignal) []NotificationPeerE } client := client ng.Go(GlobalContext, func() error { - return client.SignalService(sig, "") + // force == true preserves the current behavior + return client.SignalService(sig, "", false, true) + }, idx, *client.host) + } + return ng.Wait() +} + +// SignalServiceV2 - calls signal service RPC call on all peers with v2 API +func (sys *NotificationSys) SignalServiceV2(sig serviceSignal, dryRun, force bool) []NotificationPeerErr { + ng := WithNPeers(len(sys.peerClients)) + for idx, client := range sys.peerClients { + if client == nil { + continue + } + client := client + ng.Go(GlobalContext, func() error { + return client.SignalService(sig, "", dryRun, force) }, idx, *client.host) } return ng.Wait() @@ -1275,7 +1291,7 @@ func (sys *NotificationSys) ServiceFreeze(ctx context.Context, freeze bool) []No } client := client ng.Go(GlobalContext, func() error { - return client.SignalService(serviceSig, "") + return client.SignalService(serviceSig, "", false, true) }, idx, *client.host) } nerrs := ng.Wait() diff --git a/cmd/peer-rest-client.go b/cmd/peer-rest-client.go index 897aa90f5..7b1839a88 100644 --- a/cmd/peer-rest-client.go +++ b/cmd/peer-rest-client.go @@ -513,9 +513,11 @@ func (client *peerRESTClient) CommitBinary(ctx context.Context) error { } // SignalService - sends signal to peer nodes. -func (client *peerRESTClient) SignalService(sig serviceSignal, subSys string) error { +func (client *peerRESTClient) SignalService(sig serviceSignal, subSys string, dryRun, force bool) error { values := make(url.Values) values.Set(peerRESTSignal, strconv.Itoa(int(sig))) + values.Set(peerRESTDryRun, strconv.FormatBool(dryRun)) + values.Set(peerRESTForce, strconv.FormatBool(force)) values.Set(peerRESTSubSys, subSys) respBody, err := client.call(peerRESTMethodSignalService, values, nil, -1) if err != nil { diff --git a/cmd/peer-rest-common.go b/cmd/peer-rest-common.go index a80b89e7c..fcfc1eee3 100644 --- a/cmd/peer-rest-common.go +++ b/cmd/peer-rest-common.go @@ -18,7 +18,7 @@ package cmd const ( - peerRESTVersion = "v34" // Add metrics flag to LocalStorageInfo call + peerRESTVersion = "v35" // Add new service restart behavior peerRESTVersionPrefix = SlashSeparator + peerRESTVersion peerRESTPrefix = minioReservedBucketPath + "/peer" @@ -108,6 +108,8 @@ const ( peerRESTDepID = "depID" peerRESTStartRebalance = "start-rebalance" peerRESTMetrics = "metrics" + peerRESTDryRun = "dry-run" + peerRESTForce = "force" peerRESTListenBucket = "bucket" peerRESTListenPrefix = "prefix" diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 1cf5dc4cf..49debe959 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -20,6 +20,7 @@ package cmd import ( "context" "encoding/gob" + "encoding/json" "errors" "fmt" "io" @@ -863,6 +864,21 @@ func (s *peerRESTServer) CommitBinaryHandler(w http.ResponseWriter, r *http.Requ var errUnsupportedSignal = fmt.Errorf("unsupported signal") +func canWeRestartNode() map[string]DiskMetrics { + errs := make([]error, len(globalLocalDrives)) + infos := make([]DiskInfo, len(globalLocalDrives)) + for i, drive := range globalLocalDrives { + infos[i], errs[i] = drive.DiskInfo(GlobalContext, false) + } + infoMaps := make(map[string]DiskMetrics) + for i := range infos { + if infos[i].Metrics.TotalWaiting >= 1 && errors.Is(errs[i], errFaultyDisk) { + infoMaps[infos[i].Endpoint] = infos[i].Metrics + } + } + return infoMaps +} + // SignalServiceHandler - signal service handler. func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Request) { if !s.IsValid(w, r) { @@ -883,10 +899,26 @@ func (s *peerRESTServer) SignalServiceHandler(w http.ResponseWriter, r *http.Req } signal := serviceSignal(si) switch signal { - case serviceRestart: - globalServiceSignalCh <- signal - case serviceStop: - globalServiceSignalCh <- signal + case serviceRestart, serviceStop: + dryRun := r.Form.Get("dry-run") == "true" // This is only supported for `restart/stop` + force := r.Form.Get("force") == "true" + + waitingDisks := canWeRestartNode() + if len(waitingDisks) > 0 { + buf, err := json.Marshal(waitingDisks) + if err != nil { + s.writeErrorResponse(w, err) + return + } + s.writeErrorResponse(w, errors.New(string(buf))) + // if its forced we signal the process anyway. + if !force { + return + } + } + if !dryRun { + globalServiceSignalCh <- signal + } case serviceFreeze: freezeServices() case serviceUnFreeze: