diff --git a/weed/command/mini.go b/weed/command/mini.go index fc359f904..f7ff2fa0e 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -44,7 +45,6 @@ const ( minVolumeSizeMB = 64 // Minimum volume size in MB defaultMiniVolumeSizeMB = 128 // Default volume size for mini mode maxVolumeSizeMB = 1024 // Maximum volume size in MB (1GB) - GrpcPortOffset = 10000 // Offset used to calculate gRPC port from HTTP port ) var ( @@ -55,8 +55,6 @@ var ( miniWebDavOptions WebDavOption miniAdminOptions AdminOptions createdInitialIAM bool // Track if initial IAM config was created from env vars - // Track which port flags were explicitly passed on CLI before config file is applied - explicitPortFlags map[string]bool ) func init() { @@ -120,15 +118,6 @@ var ( miniS3AllowDeleteBucketNotEmpty = cmdMini.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket") ) -// getBindIp determines the bind IP address based on miniIp and miniBindIp flags -// Returns miniBindIp if set (non-empty), otherwise returns miniIp -func getBindIp() string { - if *miniBindIp != "" { - return *miniBindIp - } - return *miniIp -} - // initMiniCommonFlags initializes common mini flags func initMiniCommonFlags() { miniOptions.cpuprofile = cmdMini.Flag.String("cpuprofile", "", "cpu profile output file") @@ -254,7 +243,7 @@ func initMiniWebDAVFlags() { // initMiniAdminFlags initializes Admin server flag options func initMiniAdminFlags() { miniAdminOptions.port = cmdMini.Flag.Int("admin.port", 23646, "admin server http listen port") - miniAdminOptions.grpcPort = cmdMini.Flag.Int("admin.port.grpc", 0, "admin server grpc listen port (default: admin http port + GrpcPortOffset)") + miniAdminOptions.grpcPort = cmdMini.Flag.Int("admin.port.grpc", 0, "admin server grpc listen port (default: admin http port + 10000)") miniAdminOptions.master = cmdMini.Flag.String("admin.master", "", "master server address (automatically set)") miniAdminOptions.dataDir = cmdMini.Flag.String("admin.dataDir", "", "directory to store admin configuration and data files") miniAdminOptions.adminUser = cmdMini.Flag.String("admin.user", "admin", "admin interface username") @@ -338,6 +327,11 @@ func isFlagPassed(name string) bool { return found } +// isPortOpen checks if a port is available for binding on a specific IP address +func isPortOpen(port int) bool { + return isPortOpenOnIP("127.0.0.1", port) +} + // isPortOpenOnIP checks if a port is available for binding on a specific IP address func isPortOpenOnIP(ip string, port int) bool { listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ip, port)) @@ -360,146 +354,146 @@ func isPortAvailable(port int) bool { return true } +// findAvailablePort finds the next available port starting from the given port +// It returns the first available port found within maxAttempts +func findAvailablePort(startPort int, maxAttempts int) int { + return findAvailablePortOnIP("127.0.0.1", startPort, maxAttempts) +} + // findAvailablePortOnIP finds the next available port on a specific IP starting from the given port -// It skips any ports that are in the reservedPorts map (for gRPC port collision avoidance) -// It returns the first available port found within maxAttempts, or 0 if none found -func findAvailablePortOnIP(ip string, startPort int, maxAttempts int, reservedPorts map[int]bool) int { +// It returns the first available port found within maxAttempts +func findAvailablePortOnIP(ip string, startPort int, maxAttempts int) int { for i := 0; i < maxAttempts; i++ { port := startPort + i - // Skip ports reserved for gRPC calculation - if reservedPorts[port] { - continue - } // Check on both the specific IP and on all interfaces for maximum reliability if isPortOpenOnIP(ip, port) && isPortAvailable(port) { return port } } - // If no port found, return 0 to indicate failure - return 0 + // If no port found, return the original (will fail during binding) + return startPort +} + +// ensurePortAvailable ensures a port pointer points to an available port +// If the port is not available, it finds the next available port and updates the pointer +func ensurePortAvailable(portPtr *int, serviceName string) { + ensurePortAvailableOnIP(portPtr, serviceName, "127.0.0.1") } // ensurePortAvailableOnIP ensures a port pointer points to an available port on a specific IP // If the port is not available, it finds the next available port and updates the pointer -// The reservedPorts map contains ports that should not be allocated (for gRPC collision avoidance) -func ensurePortAvailableOnIP(portPtr *int, serviceName string, ip string, reservedPorts map[int]bool, flagName string) error { +func ensurePortAvailableOnIP(portPtr *int, serviceName string, ip string) { if portPtr == nil { - return nil + return } original := *portPtr - - // Check if this port was explicitly specified by the user (from CLI, before config file was applied) - isExplicitPort := explicitPortFlags[flagName] - - // Skip if this port is reserved for gRPC calculation - if reservedPorts[original] { - if isExplicitPort { - return fmt.Errorf("port %d for %s (specified by flag %s) is reserved for gRPC calculation and cannot be used", original, serviceName, flagName) - } - glog.Warningf("Port %d for %s is reserved for gRPC calculation, finding alternative...", original, serviceName) - newPort := findAvailablePortOnIP(ip, original+1, 100, reservedPorts) - if newPort == 0 { - glog.Errorf("Could not find available port for %s starting from %d, will use original %d and fail on binding", serviceName, original+1, original) - } else { - glog.Infof("Port %d for %s is available, using it instead of %d", newPort, serviceName, original) - *portPtr = newPort - } - return nil - } - // Check on both the specific IP and on all interfaces (0.0.0.0) for maximum reliability if !isPortOpenOnIP(ip, original) || !isPortAvailable(original) { - // If explicitly specified, fail immediately with the originally requested port - if isExplicitPort { - return fmt.Errorf("port %d for %s (specified by flag %s) is not available on %s and cannot be used", original, serviceName, flagName, ip) - } - // For default ports, try to find an alternative glog.Warningf("Port %d for %s is not available on %s, finding alternative port...", original, serviceName, ip) - newPort := findAvailablePortOnIP(ip, original+1, 100, reservedPorts) - if newPort == 0 { - glog.Errorf("Could not find available port for %s starting from %d, will use original %d and fail on binding", serviceName, original+1, original) - } else { + newPort := findAvailablePortOnIP(ip, original+1, 100) + if newPort != original { glog.Infof("Port %d for %s is available, using it instead of %d", newPort, serviceName, original) *portPtr = newPort } } else { glog.V(1).Infof("Port %d for %s is available on %s", original, serviceName, ip) } - return nil +} + +// ensureAllPortsAvailable ensures all mini service ports are available +// This should be called before starting any services +func ensureAllPortsAvailable() { + ensureAllPortsAvailableOnIP("127.0.0.1") } // ensureAllPortsAvailableOnIP ensures all mini service ports are available on a specific IP -// Returns an error if an explicitly specified port is unavailable. // This should be called before starting any services -func ensureAllPortsAvailableOnIP(bindIp string) error { +func ensureAllPortsAvailableOnIP(bindIp string) { portConfigs := []struct { - port *int - name string - flagName string - grpcPtr *int + port *int + name string + grpcPtr *int }{ - {miniMasterOptions.port, "Master", "master.port", miniMasterOptions.portGrpc}, - {miniFilerOptions.port, "Filer", "filer.port", miniFilerOptions.portGrpc}, - {miniOptions.v.port, "Volume", "volume.port", miniOptions.v.portGrpc}, - {miniS3Options.port, "S3", "s3.port", miniS3Options.portGrpc}, - {miniWebDavOptions.port, "WebDAV", "webdav.port", nil}, - {miniAdminOptions.port, "Admin", "admin.port", miniAdminOptions.grpcPort}, + {miniMasterOptions.port, "Master", miniMasterOptions.portGrpc}, + {miniFilerOptions.port, "Filer", miniFilerOptions.portGrpc}, + {miniOptions.v.port, "Volume", miniOptions.v.portGrpc}, + {miniS3Options.port, "S3", miniS3Options.portGrpc}, + {miniWebDavOptions.port, "WebDAV", nil}, + {miniAdminOptions.port, "Admin", miniAdminOptions.grpcPort}, } - // First, reserve all gRPC ports that will be calculated to prevent HTTP port allocation from using them - // This prevents collisions like: HTTP port moves to X, then gRPC port is calculated as Y where Y == X - reservedPorts := make(map[int]bool) + // Check all HTTP ports in parallel to be efficient + var wg sync.WaitGroup + for _, config := range portConfigs { - if config.grpcPtr != nil && *config.grpcPtr == 0 { - // This gRPC port will be calculated as httpPort + GrpcPortOffset - calculatedGrpcPort := *config.port + GrpcPortOffset - reservedPorts[calculatedGrpcPort] = true - } + wg.Add(1) + go func(portPtr *int, name string, grpcPtr *int) { + defer wg.Done() + + // Check main port on the specific IP + ensurePortAvailableOnIP(portPtr, name, bindIp) + }(config.port, config.name, config.grpcPtr) } - // Check all HTTP ports sequentially to avoid race conditions - // Each port check and allocation must complete before the next one starts - // to prevent multiple goroutines from claiming the same available port - // Also avoid allocating ports that are reserved for gRPC calculation + wg.Wait() + + // Now handle gRPC ports sequentially after HTTP ports are finalized + // gRPC ports can be: + // 1. Explicitly set by user (any independent value) + // 2. Auto-calculated as httpPort + 10000 if set to 0 + // We need to check availability for both cases for _, config := range portConfigs { - original := *config.port - if err := ensurePortAvailableOnIP(config.port, config.name, bindIp, reservedPorts, config.flagName); err != nil { - return err + if config.grpcPtr == nil { + continue } - // If port was changed, update the reserved gRPC ports mapping - if *config.port != original && config.grpcPtr != nil && *config.grpcPtr == 0 { - delete(reservedPorts, original+GrpcPortOffset) - reservedPorts[*config.port+GrpcPortOffset] = true + + httpPort := *config.port + grpcPort := *config.grpcPtr + + // If gRPC port is 0, it will be auto-calculated later as httpPort + 10000 + // Pre-calculate and check if that value would be available + if grpcPort == 0 { + calculatedGrpcPort := httpPort + 10000 + // Check if the calculated port would be available (on both specific IP and all interfaces) + if !isPortOpenOnIP(bindIp, calculatedGrpcPort) || !isPortAvailable(calculatedGrpcPort) { + glog.Warningf("Calculated gRPC port %d for %s is not available, finding alternative port...", calculatedGrpcPort, config.name) + newPort := findAvailablePortOnIP(bindIp, calculatedGrpcPort+1, 100) + glog.Infof("gRPC port %d for %s is available, using it instead of calculated %d", newPort, config.name, calculatedGrpcPort) + *config.grpcPtr = newPort + } + } else { + // gRPC port was explicitly set by user, check if it's available (on both specific IP and all interfaces) + if !isPortOpenOnIP(bindIp, grpcPort) || !isPortAvailable(grpcPort) { + glog.Warningf("gRPC port %d for %s is not available, finding alternative port...", grpcPort, config.name) + newPort := findAvailablePortOnIP(bindIp, grpcPort+1, 100) + if newPort != grpcPort { + glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, grpcPort) + *config.grpcPtr = newPort + } + } } } - // Initialize all gRPC ports before services start - // This ensures they won't be recalculated and cause conflicts - // All gRPC port handling (calculation, validation, and assignment) is performed exclusively in initializeGrpcPortsOnIP - initializeGrpcPortsOnIP(bindIp) - // Log the final port configuration glog.Infof("Final port configuration - Master: %d, Filer: %d, Volume: %d, S3: %d, WebDAV: %d, Admin: %d", *miniMasterOptions.port, *miniFilerOptions.port, *miniOptions.v.port, *miniS3Options.port, *miniWebDavOptions.port, *miniAdminOptions.port) - // Log gRPC ports too (now finalized) + // Log gRPC ports too glog.Infof("gRPC port configuration - Master: %d, Filer: %d, Volume: %d, S3: %d, Admin: %d", *miniMasterOptions.portGrpc, *miniFilerOptions.portGrpc, *miniOptions.v.portGrpc, *miniS3Options.portGrpc, *miniAdminOptions.grpcPort) - return nil + // Initialize all gRPC ports before services start + // This ensures they won't be recalculated and cause conflicts + initializeGrpcPortsOnIP(bindIp) } // initializeGrpcPortsOnIP initializes all gRPC ports based on their HTTP ports on a specific IP -// If a gRPC port is 0, it will be set to httpPort + GrpcPortOffset +// If a gRPC port is 0, it will be set to httpPort + 10000 // This must be called after HTTP ports are finalized and before services start func initializeGrpcPortsOnIP(bindIp string) { - // Track gRPC ports allocated during this function to prevent collisions between services - // when multiple services need fallback port allocation - allocatedGrpcPorts := make(map[int]bool) - grpcConfigs := []struct { httpPort *int grpcPort *int @@ -519,36 +513,21 @@ func initializeGrpcPortsOnIP(bindIp string) { // If gRPC port is 0, calculate it if *config.grpcPort == 0 { - calculatedPort := *config.httpPort + GrpcPortOffset - // Check if calculated port is available (on both specific IP and all interfaces) - // Also check if it was already allocated to another service in this function - if !isPortOpenOnIP(bindIp, calculatedPort) || !isPortAvailable(calculatedPort) || allocatedGrpcPorts[calculatedPort] { + calculatedPort := *config.httpPort + 10000 + // Double-check if calculated port is available, find alternative if not (check on both specific IP and all interfaces) + if !isPortOpenOnIP(bindIp, calculatedPort) || !isPortAvailable(calculatedPort) { glog.Warningf("Calculated gRPC port %d for %s is not available, finding alternative...", calculatedPort, config.name) - newPort := findAvailablePortOnIP(bindIp, calculatedPort+1, 100, allocatedGrpcPorts) - if newPort == 0 { - glog.Errorf("Could not find available gRPC port for %s starting from %d, will use calculated %d and fail on binding", config.name, calculatedPort+1, calculatedPort) - } else { - calculatedPort = newPort - glog.Infof("gRPC port %d for %s is available, using it instead of calculated %d", newPort, config.name, *config.httpPort+GrpcPortOffset) - } + calculatedPort = findAvailablePortOnIP(bindIp, calculatedPort+1, 100) } *config.grpcPort = calculatedPort - allocatedGrpcPorts[calculatedPort] = true glog.V(1).Infof("%s gRPC port initialized to %d", config.name, calculatedPort) } else { // gRPC port was explicitly set, verify it's still available (check on both specific IP and all interfaces) - // Also check if it was already allocated to another service in this function - if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) || allocatedGrpcPorts[*config.grpcPort] { + if !isPortOpenOnIP(bindIp, *config.grpcPort) || !isPortAvailable(*config.grpcPort) { glog.Warningf("Explicitly set gRPC port %d for %s is not available, finding alternative...", *config.grpcPort, config.name) - newPort := findAvailablePortOnIP(bindIp, *config.grpcPort+1, 100, allocatedGrpcPorts) - if newPort == 0 { - glog.Errorf("Could not find available gRPC port for %s starting from %d, will use original %d and fail on binding", config.name, *config.grpcPort+1, *config.grpcPort) - } else { - glog.Infof("gRPC port %d for %s is available, using it instead of %d", newPort, config.name, *config.grpcPort) - *config.grpcPort = newPort - } + newPort := findAvailablePortOnIP(bindIp, *config.grpcPort+1, 100) + *config.grpcPort = newPort } - allocatedGrpcPorts[*config.grpcPort] = true } } } @@ -607,11 +586,6 @@ func loadMiniConfigurationFile(dataFolder string) (map[string]string, error) { // applyConfigFileOptions sets command-line flags from loaded configuration file func applyConfigFileOptions(options map[string]string) { for key, value := range options { - // Skip port flags that were explicitly passed on CLI - if explicitPortFlags[key] { - glog.V(2).Infof("Skipping config file option %s=%s (explicitly specified on command line)", key, value) - continue - } // Set the flag value if it hasn't been explicitly set on command line flag := cmdMini.Flag.Lookup(key) if flag != nil { @@ -674,14 +648,6 @@ func saveMiniConfiguration(dataFolder string) error { func runMini(cmd *Command, args []string) bool { - // Capture which port flags were explicitly passed on CLI BEFORE config file is applied - // This is necessary to distinguish user-specified ports from defaults or config file options - explicitPortFlags = make(map[string]bool) - portFlagNames := []string{"master.port", "filer.port", "volume.port", "s3.port", "webdav.port", "admin.port"} - for _, flagName := range portFlagNames { - explicitPortFlags[flagName] = isFlagPassed(flagName) - } - // Load configuration from file if it exists configOptions, err := loadMiniConfigurationFile(*miniDataFolders) if err != nil { @@ -699,14 +665,14 @@ func runMini(cmd *Command, args []string) bool { grace.SetupProfiling(*miniOptions.cpuprofile, *miniOptions.memprofile) - // Determine bind IP - bindIp := getBindIp() + // Determine bind IP (same logic as below) + bindIp := *miniIp + if *miniBindIp != "" { + bindIp = *miniBindIp + } // Ensure all ports are available, find alternatives if needed - if err := ensureAllPortsAvailableOnIP(bindIp); err != nil { - glog.Errorf("Port allocation failed: %v", err) - os.Exit(1) - } + ensureAllPortsAvailableOnIP(bindIp) // Set master.peers to "none" if not specified (single master mode) if *miniMasterOptions.peers == "" { @@ -801,16 +767,13 @@ func runMini(cmd *Command, args []string) bool { // startMiniServices starts all mini services with proper dependency coordination func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { - // Determine bind IP for health checks - bindIp := getBindIp() - // Start Master server (no dependencies) go startMiniService("Master", func() { startMaster(miniMasterOptions, miniWhiteList) }, *miniMasterOptions.port) // Wait for master to be ready - waitForServiceReady("Master", *miniMasterOptions.port, bindIp) + waitForServiceReady("Master", *miniMasterOptions.port) // Start Volume server (depends on master) go startMiniService("Volume", func() { @@ -819,7 +782,7 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniOptions.v.port) // Wait for volume to be ready - waitForServiceReady("Volume", *miniOptions.v.port, bindIp) + waitForServiceReady("Volume", *miniOptions.v.port) // Start Filer (depends on master and volume) go startMiniService("Filer", func() { @@ -827,7 +790,7 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniFilerOptions.port) // Wait for filer to be ready - waitForServiceReady("Filer", *miniFilerOptions.port, bindIp) + waitForServiceReady("Filer", *miniFilerOptions.port) // Start S3 and WebDAV in parallel (both depend on filer) go startMiniService("S3", func() { @@ -839,8 +802,8 @@ func startMiniServices(miniWhiteList []string, allServicesReady chan struct{}) { }, *miniWebDavOptions.port) // Wait for both S3 and WebDAV to be ready - waitForServiceReady("S3", *miniS3Options.port, bindIp) - waitForServiceReady("WebDAV", *miniWebDavOptions.port, bindIp) + waitForServiceReady("S3", *miniS3Options.port) + waitForServiceReady("WebDAV", *miniWebDavOptions.port) // Start Admin with worker (depends on master, filer, S3, WebDAV) go startMiniAdminWithWorker(allServicesReady) @@ -853,8 +816,8 @@ func startMiniService(name string, fn func(), port int) { } // waitForServiceReady pings the service HTTP endpoint to check if it's ready to accept connections -func waitForServiceReady(name string, port int, bindIp string) { - address := fmt.Sprintf("http://%s:%d", bindIp, port) +func waitForServiceReady(name string, port int) { + address := fmt.Sprintf("http://127.0.0.1:%d", port) maxAttempts := 30 // 30 * 200ms = 6 seconds max wait attempt := 0 client := &http.Client{ @@ -932,28 +895,11 @@ func startMiniAdminWithWorker(allServicesReady chan struct{}) { // Set admin options *miniAdminOptions.master = masterAddr - // gRPC port should have been initialized by ensureAllPortsAvailableOnIP in runMini - // If it's still 0, that indicates a problem with the port initialization sequence - // This defensive fallback handles edge cases where port initialization may have been skipped - // or failed silently (e.g., due to configuration changes or error handling paths) + // Note: gRPC port should already be initialized by ensureAllPortsAvailableOnIP + // only set it here if it's still 0 (shouldn't happen in normal flow) if *miniAdminOptions.grpcPort == 0 { - glog.Warningf("Admin gRPC port was not initialized before startAdminServer, attempting fallback initialization...") - // Use the same availability checking logic as initializeGrpcPortsOnIP - calculatedPort := *miniAdminOptions.port + GrpcPortOffset - if !isPortOpenOnIP(getBindIp(), calculatedPort) || !isPortAvailable(calculatedPort) { - glog.Warningf("Calculated fallback gRPC port %d is not available, finding alternative...", calculatedPort) - newPort := findAvailablePortOnIP(getBindIp(), calculatedPort+1, 100, make(map[int]bool)) - if newPort == 0 { - glog.Errorf("Could not find available gRPC port for Admin starting from %d, will use calculated %d and fail on binding", calculatedPort+1, calculatedPort) - *miniAdminOptions.grpcPort = calculatedPort - } else { - glog.Infof("Fallback: using gRPC port %d for Admin", newPort) - *miniAdminOptions.grpcPort = newPort - } - } else { - *miniAdminOptions.grpcPort = calculatedPort - glog.Infof("Fallback: Admin gRPC port initialized to %d", calculatedPort) - } + *miniAdminOptions.grpcPort = *miniAdminOptions.port + 10000 + glog.V(1).Infof("Admin gRPC port was 0, calculated to %d", *miniAdminOptions.grpcPort) } // Create data directory if specified