From fbe758efa8b2db65a435e365908df51c68045f8d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 8 Apr 2026 11:30:02 -0700 Subject: [PATCH] test: consolidate port allocation into shared test/testutil package (#8982) * test: consolidate port allocation into shared test/testutil package Move duplicated port allocation logic from 15+ test files into a single shared package at test/testutil/. This fixes a port collision bug where independently allocated ports could overlap via the gRPC offset (port+10000), causing weed mini to reject the configuration. The shared package provides: - AllocatePorts: atomic allocation of N unique ports - AllocateMiniPorts/MustFreeMiniPorts: gRPC-offset-aware allocation that prevents port A+10000 == port B collisions - WaitForPort, WaitForService, FindBindIP, WriteIAMConfig, HasDocker * test: address review feedback and fix FUSE build - Revert fuse_integration change: it has its own go.mod and cannot import the shared testutil package - AllocateMiniPorts: hold all listeners open until the entire batch is allocated, preventing race conditions where other processes steal ports - HasDocker: add 5s context timeout to avoid hanging on stalled Docker - WaitForService: only treat 2xx HTTP status codes as ready * test: use global rand in AllocateMiniPorts for better seeding Go 1.20+ auto-seeds the global rand generator. Using it avoids identical sequences when multiple tests call at the same nanosecond. * test: revert WaitForService status code check S3 endpoints return non-2xx (e.g. 403) on bare GET requests, so requiring 2xx caused the S3 integration test to time out. Any HTTP response is sufficient proof that the service is running. * test: fix gofmt formatting in s3tables test files --- test/fuse_integration/framework_test.go | 5 +- test/multi_master/cluster.go | 50 ++------ test/s3/catalog_trino/trino_catalog_test.go | 99 +++------------ .../distributed_lock_cluster_test.go | 22 +--- test/s3/normal/s3_integration_test.go | 71 ++--------- test/s3/policy/policy_test.go | 72 ++--------- test/s3/spark/setup_test.go | 9 +- test/s3tables/catalog/iceberg_catalog_test.go | 116 +++-------------- .../s3tables/catalog_risingwave/setup_test.go | 13 +- test/s3tables/catalog_spark/setup_test.go | 11 +- .../catalog_trino/trino_catalog_test.go | 102 +++------------ test/s3tables/lakekeeper/lakekeeper_test.go | 2 +- .../maintenance_integration_test.go | 24 +--- test/s3tables/polaris/polaris_env_test.go | 2 +- .../sts_integration/sts_integration_test.go | 2 +- .../s3tables_integration_test.go | 32 +---- test/s3tables/testutil/docker.go | 92 -------------- .../weed_mini.go => testutil/helpers.go} | 68 ++++++---- test/testutil/ports.go | 120 ++++++++++++++++++ test/volume_server/framework/cluster.go | 45 +------ test/volume_server/framework/cluster_mixed.go | 7 +- test/volume_server/framework/cluster_multi.go | 7 +- .../framework/cluster_multi_rust.go | 7 +- test/volume_server/framework/cluster_rust.go | 7 +- .../framework/cluster_with_filer.go | 3 +- .../grpc/fetch_remote_s3_test.go | 38 ++---- 26 files changed, 309 insertions(+), 717 deletions(-) delete mode 100644 test/s3tables/testutil/docker.go rename test/{s3tables/testutil/weed_mini.go => testutil/helpers.go} (51%) create mode 100644 test/testutil/ports.go diff --git a/test/fuse_integration/framework_test.go b/test/fuse_integration/framework_test.go index d8e0cd246..e75c83348 100644 --- a/test/fuse_integration/framework_test.go +++ b/test/fuse_integration/framework_test.go @@ -82,12 +82,13 @@ func NewFuseTestFramework(t *testing.T, config *TestConfig) *FuseTestFramework { } } -// freePort asks the OS for a free TCP port. +// freePort asks the OS for a free TCP port in a range where the gRPC +// offset (port + 10000) won't collide with well-known ports. func freePort(t *testing.T) int { t.Helper() const ( minServicePort = 20000 - maxServicePort = 55535 // SeaweedFS gRPC service uses httpPort + 10000. + maxServicePort = 55535 ) portCount := maxServicePort - minServicePort + 1 diff --git a/test/multi_master/cluster.go b/test/multi_master/cluster.go index ac84ddb9c..6e24d50a8 100644 --- a/test/multi_master/cluster.go +++ b/test/multi_master/cluster.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "net" "net/http" "os" "os/exec" @@ -17,6 +16,8 @@ import ( "sync" "testing" "time" + + "github.com/seaweedfs/seaweedfs/test/testutil" ) const ( @@ -73,23 +74,23 @@ func StartMasterCluster(t testing.TB) *MasterCluster { logsDir := filepath.Join(baseDir, "logs") os.MkdirAll(logsDir, 0o755) - // Allocate 3 port pairs (http, grpc) atomically to prevent reuse. - portPairs, err := allocateMultipleMasterPortPairs(3) + // Allocate 3 mini-safe ports (each guarantees port+10000 is also free). + httpPorts, err := testutil.AllocateMiniPorts(3) if err != nil { t.Fatalf("allocate ports: %v", err) } var nodes [3]*masterNode var peerParts []string - for i, pp := range portPairs { + for i, hp := range httpPorts { dataDir := filepath.Join(baseDir, fmt.Sprintf("m%d", i)) os.MkdirAll(dataDir, 0o755) nodes[i] = &masterNode{ - port: pp[0], - grpcPort: pp[1], + port: hp, + grpcPort: hp + testutil.GrpcPortOffset, dataDir: dataDir, logFile: filepath.Join(logsDir, fmt.Sprintf("master%d.log", i)), } - peerParts = append(peerParts, fmt.Sprintf("127.0.0.1:%d", pp[0])) + peerParts = append(peerParts, fmt.Sprintf("127.0.0.1:%d", hp)) } mc := &MasterCluster{ @@ -357,41 +358,6 @@ func (mc *MasterCluster) tailLog(i int) string { return strings.Join(lines, "\n") } -// --- port and binary helpers (adapted from test/volume_server/framework) --- - -// allocateMultipleMasterPortPairs finds n non-overlapping (http, grpc) port -// pairs, holding all listeners until all are found, then releasing them -// together to avoid races between consecutive allocations. -func allocateMultipleMasterPortPairs(n int) ([][2]int, error) { - var listeners []net.Listener - var pairs [][2]int - - defer func() { - for _, l := range listeners { - l.Close() - } - }() - - for masterPort := 10000; masterPort <= 55535 && len(pairs) < n; masterPort++ { - grpcPort := masterPort + 10000 - l1, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterPort))) - if err != nil { - continue - } - l2, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(grpcPort))) - if err != nil { - l1.Close() - continue - } - listeners = append(listeners, l1, l2) - pairs = append(pairs, [2]int{masterPort, grpcPort}) - } - - if len(pairs) < n { - return nil, fmt.Errorf("could only allocate %d of %d master port pairs", len(pairs), n) - } - return pairs, nil -} func findOrBuildWeedBinary() (string, error) { if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" { diff --git a/test/s3/catalog_trino/trino_catalog_test.go b/test/s3/catalog_trino/trino_catalog_test.go index cd5a72711..bc0d7dceb 100644 --- a/test/s3/catalog_trino/trino_catalog_test.go +++ b/test/s3/catalog_trino/trino_catalog_test.go @@ -5,7 +5,6 @@ import ( "crypto/rand" "fmt" "io" - "net" "net/http" "os" "os/exec" @@ -13,6 +12,8 @@ import ( "strings" "testing" "time" + + "github.com/seaweedfs/seaweedfs/test/testutil" ) type TestEnvironment struct { @@ -96,29 +97,26 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment { t.Fatalf("Failed to create temp dir: %v", err) } - bindIP := findBindIP() + bindIP := testutil.FindBindIP() - masterPort, masterGrpcPort := mustFreePortPair(t, "Master") - volumePort, volumeGrpcPort := mustFreePortPair(t, "Volume") - filerPort, filerGrpcPort := mustFreePortPair(t, "Filer") - s3Port, s3GrpcPort := mustFreePortPair(t, "S3") - icebergPort := mustFreePort(t, "Iceberg") + // 9 ports: master(2), volume(2), filer(2), s3(2), iceberg(1) + ports := testutil.MustAllocatePorts(t, 9) return &TestEnvironment{ seaweedDir: seaweedDir, weedBinary: weedBinary, dataDir: dataDir, bindIP: bindIP, - s3Port: s3Port, - s3GrpcPort: s3GrpcPort, - icebergPort: icebergPort, - masterPort: masterPort, - masterGrpcPort: masterGrpcPort, - filerPort: filerPort, - filerGrpcPort: filerGrpcPort, - volumePort: volumePort, - volumeGrpcPort: volumeGrpcPort, - dockerAvailable: hasDocker(), + masterPort: ports[0], + masterGrpcPort: ports[1], + volumePort: ports[2], + volumeGrpcPort: ports[3], + filerPort: ports[4], + filerGrpcPort: ports[5], + s3Port: ports[6], + s3GrpcPort: ports[7], + icebergPort: ports[8], + dockerAvailable: testutil.HasDocker(), } } @@ -333,73 +331,6 @@ func createObjectBucket(t *testing.T, env *TestEnvironment, bucketName string) { } } -func hasDocker() bool { - cmd := exec.Command("docker", "version") - return cmd.Run() == nil -} - -func mustFreePort(t *testing.T, name string) int { - t.Helper() - - port, err := getFreePort() - if err != nil { - t.Fatalf("Failed to get free port for %s: %v", name, err) - } - return port -} - -func mustFreePortPair(t *testing.T, name string) (int, int) { - t.Helper() - - httpPort, grpcPort, err := findAvailablePortPair() - if err != nil { - t.Fatalf("Failed to get free port pair for %s: %v", name, err) - } - return httpPort, grpcPort -} - -func findAvailablePortPair() (int, int, error) { - httpPort, err := getFreePort() - if err != nil { - return 0, 0, err - } - grpcPort, err := getFreePort() - if err != nil { - return 0, 0, err - } - return httpPort, grpcPort, nil -} - -func getFreePort() (int, error) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, err - } - defer listener.Close() - - addr := listener.Addr().(*net.TCPAddr) - return addr.Port, nil -} - -func findBindIP() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "127.0.0.1" - } - for _, addr := range addrs { - ipNet, ok := addr.(*net.IPNet) - if !ok || ipNet.IP == nil { - continue - } - ip := ipNet.IP.To4() - if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() { - continue - } - return ip.String() - } - return "127.0.0.1" -} - func randomString(length int) string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" b := make([]byte, length) diff --git a/test/s3/distributed_lock/distributed_lock_cluster_test.go b/test/s3/distributed_lock/distributed_lock_cluster_test.go index 6cf1c6a4d..9423feda8 100644 --- a/test/s3/distributed_lock/distributed_lock_cluster_test.go +++ b/test/s3/distributed_lock/distributed_lock_cluster_test.go @@ -20,6 +20,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/framework" "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -115,7 +116,7 @@ func startDistributedLockCluster(t *testing.T) *distributedLockCluster { require.NoError(t, os.MkdirAll(dir, 0o755), "create %s", dir) } - ports, err := allocatePorts(12) + ports, err := testutil.AllocatePorts(12) require.NoError(t, err, "allocate ports") cluster.masterPort = ports[0] cluster.masterGrpcPort = ports[1] @@ -591,25 +592,6 @@ func (c *distributedLockCluster) tailLog(name string) string { return strings.Join(lines, "\n") } -func allocatePorts(count int) ([]int, error) { - listeners := make([]net.Listener, 0, count) - ports := make([]int, 0, count) - for i := 0; i < count; i++ { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - for _, openListener := range listeners { - _ = openListener.Close() - } - return nil, err - } - listeners = append(listeners, l) - ports = append(ports, l.Addr().(*net.TCPAddr).Port) - } - for _, l := range listeners { - _ = l.Close() - } - return ports, nil -} func stopProcess(cmd *exec.Cmd) { if cmd == nil || cmd.Process == nil { diff --git a/test/s3/normal/s3_integration_test.go b/test/s3/normal/s3_integration_test.go index 6abab8849..07f955503 100644 --- a/test/s3/normal/s3_integration_test.go +++ b/test/s3/normal/s3_integration_test.go @@ -7,7 +7,6 @@ import ( "encoding/base64" "fmt" "math/rand" - "net" "net/http" "os" "os/exec" @@ -25,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/framework" "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -112,37 +112,16 @@ func TestS3Integration(t *testing.T) { } // findAvailablePort finds an available port by binding to port 0 -func findAvailablePort() (int, error) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, err - } - defer listener.Close() - - addr := listener.Addr().(*net.TCPAddr) - return addr.Port, nil -} // startMiniCluster starts a weed mini instance directly without exec. // Extra flags (e.g. "-s3.allowDeleteBucketNotEmpty=false") can be appended via extraArgs. func startMiniCluster(t *testing.T, extraArgs ...string) (*TestCluster, error) { - // Find available ports - masterPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find master port: %v", err) - } - volumePort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find volume port: %v", err) - } - filerPort, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find filer port: %v", err) - } - s3Port, err := findAvailablePort() - if err != nil { - return nil, fmt.Errorf("failed to find s3 port: %v", err) - } + // Allocate non-colliding ports (including gRPC offsets) for weed mini. + ports := testutil.MustFreeMiniPorts(t, []string{"Master", "Volume", "Filer", "S3"}) + masterPort := ports[0] + volumePort := ports[1] + filerPort := ports[2] + s3Port := ports[3] // Create temporary directory for test data testDir := t.TempDir() @@ -167,7 +146,7 @@ func startMiniCluster(t *testing.T, extraArgs ...string) (*TestCluster, error) { // Create empty security.toml to disable JWT authentication in tests securityToml := filepath.Join(testDir, "security.toml") - err = os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644) + err := os.WriteFile(securityToml, []byte("# Empty security config for testing\n"), 0644) if err != nil { cancel() return nil, fmt.Errorf("failed to create security.toml: %v", err) @@ -233,10 +212,9 @@ func startMiniCluster(t *testing.T, extraArgs ...string) (*TestCluster, error) { }() // Wait for S3 service to be ready - err = waitForS3Ready(cluster.s3Endpoint, 30*time.Second) - if err != nil { + if !testutil.WaitForService(cluster.s3Endpoint, 30*time.Second) { cancel() - return nil, fmt.Errorf("S3 service failed to start: %v", err) + return nil, fmt.Errorf("S3 service failed to start at %s", cluster.s3Endpoint) } // If VOLUME_SERVER_IMPL=rust, start a Rust volume server alongside weed mini @@ -277,14 +255,12 @@ func (c *TestCluster) startRustVolumeServer(t *testing.T) error { return fmt.Errorf("resolve rust volume binary: %v", err) } - rustVolumePort, err := findAvailablePort() + rustPorts, err := testutil.AllocatePorts(2) if err != nil { - return fmt.Errorf("find rust volume port: %v", err) - } - rustVolumeGrpcPort, err := findAvailablePort() - if err != nil { - return fmt.Errorf("find rust volume grpc port: %v", err) + return fmt.Errorf("find rust volume ports: %v", err) } + rustVolumePort := rustPorts[0] + rustVolumeGrpcPort := rustPorts[1] rustVolumeDir := filepath.Join(c.dataDir, "rust-volume") if err := os.MkdirAll(rustVolumeDir, 0o755); err != nil { @@ -377,25 +353,6 @@ func (c *TestCluster) Stop() { } } -// waitForS3Ready waits for the S3 service to be ready -func waitForS3Ready(endpoint string, timeout time.Duration) error { - client := &http.Client{Timeout: 1 * time.Second} - deadline := time.Now().Add(timeout) - - for time.Now().Before(deadline) { - resp, err := client.Get(endpoint) - if err == nil { - resp.Body.Close() - // Wait a bit more to ensure service is fully ready - time.Sleep(500 * time.Millisecond) - return nil - } - time.Sleep(200 * time.Millisecond) - } - - return fmt.Errorf("timeout waiting for S3 service at %s", endpoint) -} - // Test functions func testCreateBucket(t *testing.T, cluster *TestCluster) { diff --git a/test/s3/policy/policy_test.go b/test/s3/policy/policy_test.go index 8c97f4a58..866615098 100644 --- a/test/s3/policy/policy_test.go +++ b/test/s3/policy/policy_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "net" "net/http" "os" "os/exec" @@ -21,6 +20,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/iam" "github.com/aws/aws-sdk-go/service/s3" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/framework" "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -703,43 +703,13 @@ func uniqueName(prefix string) string { // --- Test setup helpers --- -func findAvailablePort() (int, error) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, err - } - defer listener.Close() - addr := listener.Addr().(*net.TCPAddr) - return addr.Port, nil -} - -// findAvailablePortPair finds an available http port P such that P and P+10000 (grpc) are both available -func findAvailablePortPair() (int, int, error) { - httpPort, err := findAvailablePort() - if err != nil { - return 0, 0, err - } - for i := 0; i < 100; i++ { - grpcPort, err := findAvailablePort() - if err != nil { - return 0, 0, err - } - if grpcPort != httpPort { - return httpPort, grpcPort, nil - } - } - return 0, 0, fmt.Errorf("failed to find available port pair") -} func startMiniCluster(t *testing.T) (*TestCluster, error) { - masterPort, masterGrpcPort, err := findAvailablePortPair() - require.NoError(t, err) - volumePort, volumeGrpcPort, err := findAvailablePortPair() - require.NoError(t, err) - filerPort, filerGrpcPort, err := findAvailablePortPair() - require.NoError(t, err) - s3Port, s3GrpcPort, err := findAvailablePortPair() - require.NoError(t, err) + ports := testutil.MustAllocatePorts(t, 8) + masterPort, masterGrpcPort := ports[0], ports[1] + volumePort, volumeGrpcPort := ports[2], ports[3] + filerPort, filerGrpcPort := ports[4], ports[5] + s3Port, s3GrpcPort := ports[6], ports[7] testDir := t.TempDir() @@ -760,7 +730,7 @@ func startMiniCluster(t *testing.T) (*TestCluster, error) { // Disable authentication for tests securityToml := filepath.Join(testDir, "security.toml") - err = os.WriteFile(securityToml, []byte("# Empty security config\n"), 0644) + err := os.WriteFile(securityToml, []byte("# Empty security config\n"), 0644) require.NoError(t, err) // Configure credential store for IAM tests @@ -819,10 +789,9 @@ enabled = true }() // Wait for S3 - err = waitForS3Ready(cluster.s3Endpoint, 60*time.Second) - if err != nil { + if !testutil.WaitForService(cluster.s3Endpoint, 60*time.Second) { cancel() - return nil, err + return nil, fmt.Errorf("timeout waiting for S3 at %s", cluster.s3Endpoint) } // If VOLUME_SERVER_IMPL=rust, start a Rust volume server alongside weed mini @@ -837,19 +806,6 @@ enabled = true return cluster, nil } -func waitForS3Ready(endpoint string, timeout time.Duration) error { - client := &http.Client{Timeout: 1 * time.Second} - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - resp, err := client.Get(endpoint) - if err == nil { - resp.Body.Close() - return nil - } - time.Sleep(200 * time.Millisecond) - } - return fmt.Errorf("timeout waiting for S3") -} // startRustVolumeServer starts a Rust volume server that registers with the same master. func (c *TestCluster) startRustVolumeServer(t *testing.T) error { @@ -860,14 +816,12 @@ func (c *TestCluster) startRustVolumeServer(t *testing.T) error { return fmt.Errorf("resolve rust volume binary: %v", err) } - rustVolumePort, err := findAvailablePort() + rustPorts, err := testutil.AllocatePorts(2) if err != nil { - return fmt.Errorf("find rust volume port: %v", err) - } - rustVolumeGrpcPort, err := findAvailablePort() - if err != nil { - return fmt.Errorf("find rust volume grpc port: %v", err) + return fmt.Errorf("find rust volume ports: %v", err) } + rustVolumePort := rustPorts[0] + rustVolumeGrpcPort := rustPorts[1] rustVolumeDir := filepath.Join(c.dataDir, "rust-volume") if err := os.MkdirAll(rustVolumeDir, 0o755); err != nil { diff --git a/test/s3/spark/setup_test.go b/test/s3/spark/setup_test.go index 8be585675..efdb2c60d 100644 --- a/test/s3/spark/setup_test.go +++ b/test/s3/spark/setup_test.go @@ -14,7 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/testcontainers/testcontainers-go" ) @@ -94,9 +94,10 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } - env.masterPort = testutil.MustFreeMiniPort(t, "Master") - env.filerPort = testutil.MustFreeMiniPort(t, "Filer") - env.s3Port = testutil.MustFreeMiniPort(t, "S3") + ports := testutil.MustFreeMiniPorts(t, []string{"Master", "Filer", "S3"}) + env.masterPort = ports[0] + env.filerPort = ports[1] + env.s3Port = ports[2] bindIP := testutil.FindBindIP() iamConfigPath, err := testutil.WriteIAMConfig(env.seaweedfsDataDir, env.accessKey, env.secretKey) diff --git a/test/s3tables/catalog/iceberg_catalog_test.go b/test/s3tables/catalog/iceberg_catalog_test.go index b4e13d5ad..3930d292c 100644 --- a/test/s3tables/catalog/iceberg_catalog_test.go +++ b/test/s3tables/catalog/iceberg_catalog_test.go @@ -9,7 +9,6 @@ import ( "flag" "fmt" "io" - "net" "net/http" "os" "os/exec" @@ -17,6 +16,8 @@ import ( "strings" "testing" "time" + + "github.com/seaweedfs/seaweedfs/test/testutil" ) // sharedEnv is the single TestEnvironment shared across all tests in this package. @@ -69,23 +70,6 @@ type TestEnvironment struct { dockerAvailable bool } -// hasDocker checks if Docker is available -func hasDocker() bool { - cmd := exec.Command("docker", "version") - return cmd.Run() == nil -} - -// getFreePort returns an available ephemeral port and its listener -func getFreePort() (int, net.Listener, error) { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, nil, err - } - - addr := listener.Addr().(*net.TCPAddr) - return addr.Port, listener, nil -} - // newTestEnvironmentForMain creates a TestEnvironment without calling t.Fatalf so it // can be used from TestMain (which has no *testing.T). func newTestEnvironmentForMain() (*TestEnvironment, error) { @@ -118,95 +102,27 @@ func newTestEnvironmentForMain() (*TestEnvironment, error) { return nil, fmt.Errorf("create temp dir: %w", err) } - // Allocate free ephemeral ports for each service - var listeners []net.Listener - closeListeners := func() { - for _, l := range listeners { - l.Close() - } - } - - var l net.Listener - s3Port, l, err := getFreePort() + // Allocate 9 unique ports atomically: s3, iceberg, s3Grpc, master, masterGrpc, + // filer, filerGrpc, volume, volumeGrpc + ports, err := testutil.AllocatePorts(9) if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for S3: %w", err) + return nil, fmt.Errorf("allocate ports: %w", err) } - listeners = append(listeners, l) - - icebergPort, l, err := getFreePort() - if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for Iceberg: %w", err) - } - listeners = append(listeners, l) - - s3GrpcPort, l, err := getFreePort() - if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for S3 gRPC: %w", err) - } - listeners = append(listeners, l) - - masterPort, l, err := getFreePort() - if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for Master: %w", err) - } - listeners = append(listeners, l) - - masterGrpcPort, l, err := getFreePort() - if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for Master gRPC: %w", err) - } - listeners = append(listeners, l) - - filerPort, l, err := getFreePort() - if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for Filer: %w", err) - } - listeners = append(listeners, l) - - filerGrpcPort, l, err := getFreePort() - if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for Filer gRPC: %w", err) - } - listeners = append(listeners, l) - - volumePort, l, err := getFreePort() - if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for Volume: %w", err) - } - listeners = append(listeners, l) - - volumeGrpcPort, l, err := getFreePort() - if err != nil { - closeListeners() - return nil, fmt.Errorf("get free port for Volume gRPC: %w", err) - } - listeners = append(listeners, l) - - // Release the port reservations so weed mini can bind to them - closeListeners() return &TestEnvironment{ seaweedDir: seaweedDir, weedBinary: weedBinary, dataDir: dataDir, - s3Port: s3Port, - s3GrpcPort: s3GrpcPort, - icebergPort: icebergPort, - masterPort: masterPort, - masterGrpcPort: masterGrpcPort, - filerPort: filerPort, - filerGrpcPort: filerGrpcPort, - volumePort: volumePort, - volumeGrpcPort: volumeGrpcPort, - dockerAvailable: hasDocker(), + s3Port: ports[0], + s3GrpcPort: ports[1], + icebergPort: ports[2], + masterPort: ports[3], + masterGrpcPort: ports[4], + filerPort: ports[5], + filerGrpcPort: ports[6], + volumePort: ports[7], + volumeGrpcPort: ports[8], + dockerAvailable: testutil.HasDocker(), }, nil } diff --git a/test/s3tables/catalog_risingwave/setup_test.go b/test/s3tables/catalog_risingwave/setup_test.go index 58c6ed757..781c912f1 100644 --- a/test/s3tables/catalog_risingwave/setup_test.go +++ b/test/s3tables/catalog_risingwave/setup_test.go @@ -18,7 +18,7 @@ import ( v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/seaweedfs/seaweedfs/test/testutil" ) var ( @@ -114,11 +114,12 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } - env.masterPort = testutil.MustFreeMiniPort(t, "Master") - env.filerPort = testutil.MustFreeMiniPort(t, "Filer") - env.s3Port = testutil.MustFreeMiniPort(t, "S3") - env.icebergRestPort = testutil.MustFreeMiniPort(t, "Iceberg") - env.risingwavePort = testutil.MustFreeMiniPort(t, "RisingWave") + ports := testutil.MustFreeMiniPorts(t, []string{"Master", "Filer", "S3", "Iceberg", "RisingWave"}) + env.masterPort = ports[0] + env.filerPort = ports[1] + env.s3Port = ports[2] + env.icebergRestPort = ports[3] + env.risingwavePort = ports[4] env.bindIP = testutil.FindBindIP() diff --git a/test/s3tables/catalog_spark/setup_test.go b/test/s3tables/catalog_spark/setup_test.go index 1b1e323c2..0c25b6828 100644 --- a/test/s3tables/catalog_spark/setup_test.go +++ b/test/s3tables/catalog_spark/setup_test.go @@ -16,7 +16,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/testcontainers/testcontainers-go" ) @@ -90,10 +90,11 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Fatalf("failed to create temp directory: %v", err) } - env.masterPort = testutil.MustFreeMiniPort(t, "Master") - env.filerPort = testutil.MustFreeMiniPort(t, "Filer") - env.s3Port = testutil.MustFreeMiniPort(t, "S3") - env.icebergRestPort = testutil.MustFreeMiniPort(t, "Iceberg") + ports := testutil.MustFreeMiniPorts(t, []string{"Master", "Filer", "S3", "Iceberg"}) + env.masterPort = ports[0] + env.filerPort = ports[1] + env.s3Port = ports[2] + env.icebergRestPort = ports[3] bindIP := testutil.FindBindIP() diff --git a/test/s3tables/catalog_trino/trino_catalog_test.go b/test/s3tables/catalog_trino/trino_catalog_test.go index c32b06cc4..e937f66a5 100644 --- a/test/s3tables/catalog_trino/trino_catalog_test.go +++ b/test/s3tables/catalog_trino/trino_catalog_test.go @@ -17,7 +17,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/seaweedfs/seaweedfs/test/testutil" ) type TestEnvironment struct { @@ -40,7 +40,6 @@ type TestEnvironment struct { dockerAvailable bool accessKey string secretKey string - closers []io.Closer } func TestTrinoIcebergCatalog(t *testing.T) { @@ -214,19 +213,24 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment { bindIP := testutil.FindBindIP() - env := &TestEnvironment{ - seaweedDir: seaweedDir, - weedBinary: weedBinary, - dataDir: dataDir, - bindIP: bindIP, - closers: []io.Closer{}, - } + // 9 ports: master(2), volume(2), filer(2), s3(2), iceberg(1) + ports := testutil.MustAllocatePorts(t, 9) - env.masterPort, env.masterGrpcPort = env.mustFreePortPair("Master") - env.volumePort, env.volumeGrpcPort = env.mustFreePortPair("Volume") - env.filerPort, env.filerGrpcPort = env.mustFreePortPair("Filer") - env.s3Port, env.s3GrpcPort = env.mustFreePortPair("S3") - env.icebergPort = env.mustFreePort("Iceberg") + env := &TestEnvironment{ + seaweedDir: seaweedDir, + weedBinary: weedBinary, + dataDir: dataDir, + bindIP: bindIP, + masterPort: ports[0], + masterGrpcPort: ports[1], + volumePort: ports[2], + volumeGrpcPort: ports[3], + filerPort: ports[4], + filerGrpcPort: ports[5], + s3Port: ports[6], + s3GrpcPort: ports[7], + icebergPort: ports[8], + } env.dockerAvailable = hasDocker() env.accessKey = "AKIAIOSFODNN7EXAMPLE" @@ -235,47 +239,6 @@ func NewTestEnvironment(t *testing.T) *TestEnvironment { return env } -func (env *TestEnvironment) mustFreePort(name string) int { - port, closer, err := getFreePort() - if err != nil { - panic(fmt.Sprintf("Failed to get free port for %s: %v", name, err)) - } - env.closers = append(env.closers, closer) - return port -} - -func (env *TestEnvironment) mustFreePortPair(name string) (int, int) { - httpPort, httpCloser, grpcPort, grpcCloser, err := findAvailablePortPair() - if err != nil { - panic(fmt.Sprintf("Failed to get free port pair for %s: %v", name, err)) - } - env.closers = append(env.closers, httpCloser, grpcCloser) - return httpPort, grpcPort -} - -func mustFreePort(t *testing.T, name string) int { - t.Helper() - - port, closer, err := getFreePort() - if err != nil { - t.Fatalf("Failed to get free port for %s: %v", name, err) - } - closer.Close() - return port -} - -func mustFreePortPair(t *testing.T, name string) (int, int) { - t.Helper() - - httpPort, httpCloser, grpcPort, grpcCloser, err := findAvailablePortPair() - if err != nil { - t.Fatalf("Failed to get free port pair for %s: %v", name, err) - } - httpCloser.Close() - grpcCloser.Close() - return httpPort, grpcPort -} - func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { t.Helper() @@ -293,12 +256,6 @@ func (env *TestEnvironment) StartSeaweedFS(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) env.weedCancel = cancel - // Close all port listeners right before starting the weed process - for _, closer := range env.closers { - closer.Close() - } - env.closers = nil - cmd := exec.CommandContext(ctx, env.weedBinary, "mini", "-master.port", fmt.Sprintf("%d", env.masterPort), "-master.port.grpc", fmt.Sprintf("%d", env.masterGrpcPort), @@ -631,29 +588,6 @@ func hasDocker() bool { return cmd.Run() == nil } -func findAvailablePortPair() (int, io.Closer, int, io.Closer, error) { - httpPort, httpCloser, err := getFreePort() - if err != nil { - return 0, nil, 0, nil, err - } - grpcPort, grpcCloser, err := getFreePort() - if err != nil { - httpCloser.Close() - return 0, nil, 0, nil, err - } - return httpPort, httpCloser, grpcPort, grpcCloser, nil -} - -func getFreePort() (int, io.Closer, error) { - listener, err := net.Listen("tcp", "0.0.0.0:0") - if err != nil { - return 0, nil, err - } - - addr := listener.Addr().(*net.TCPAddr) - return addr.Port, listener, nil -} - func randomString(length int) string { const charset = "abcdefghijklmnopqrstuvwxyz0123456789" b := make([]byte, length) diff --git a/test/s3tables/lakekeeper/lakekeeper_test.go b/test/s3tables/lakekeeper/lakekeeper_test.go index b727d6e69..41059c4ce 100644 --- a/test/s3tables/lakekeeper/lakekeeper_test.go +++ b/test/s3tables/lakekeeper/lakekeeper_test.go @@ -25,7 +25,7 @@ import ( s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/sts" - "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" ) diff --git a/test/s3tables/maintenance/maintenance_integration_test.go b/test/s3tables/maintenance/maintenance_integration_test.go index ae1a166c6..f57c2cc4e 100644 --- a/test/s3tables/maintenance/maintenance_integration_test.go +++ b/test/s3tables/maintenance/maintenance_integration_test.go @@ -16,7 +16,6 @@ import ( "flag" "fmt" "io" - "net" "net/http" "os" "path" @@ -40,6 +39,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -92,7 +92,7 @@ func TestMain(m *testing.M) { } func startCluster(testDir string, extraArgs []string) (*testCluster, error) { - ports, err := findPorts(10) + ports, err := testutil.AllocatePorts(10) if err != nil { return nil, err } @@ -201,26 +201,6 @@ func (c *testCluster) filerConn(t *testing.T) (*grpc.ClientConn, filer_pb.Seawee return conn, filer_pb.NewSeaweedFilerClient(conn) } -func findPorts(n int) ([]int, error) { - ls := make([]*net.TCPListener, n) - ps := make([]int, n) - for i := 0; i < n; i++ { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - for j := 0; j < i; j++ { - ls[j].Close() - } - return nil, err - } - ls[i] = l.(*net.TCPListener) - ps[i] = ls[i].Addr().(*net.TCPAddr).Port - } - for _, l := range ls { - l.Close() - } - return ps, nil -} - func waitReady(endpoint string, timeout time.Duration) error { client := &http.Client{Timeout: 1 * time.Second} deadline := time.Now().Add(timeout) diff --git a/test/s3tables/polaris/polaris_env_test.go b/test/s3tables/polaris/polaris_env_test.go index e59a4c0ab..fe286e18a 100644 --- a/test/s3tables/polaris/polaris_env_test.go +++ b/test/s3tables/polaris/polaris_env_test.go @@ -19,7 +19,7 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/seaweedfs/seaweedfs/test/testutil" ) const ( diff --git a/test/s3tables/sts_integration/sts_integration_test.go b/test/s3tables/sts_integration/sts_integration_test.go index c29e6027d..6b31eb53f 100644 --- a/test/s3tables/sts_integration/sts_integration_test.go +++ b/test/s3tables/sts_integration/sts_integration_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/seaweedfs/seaweedfs/test/s3tables/testutil" + "github.com/seaweedfs/seaweedfs/test/testutil" ) // TestEnvironment mirrors the one in trino_catalog_test.go but simplified diff --git a/test/s3tables/table-buckets/s3tables_integration_test.go b/test/s3tables/table-buckets/s3tables_integration_test.go index e971a42fd..f48c3f046 100644 --- a/test/s3tables/table-buckets/s3tables_integration_test.go +++ b/test/s3tables/table-buckets/s3tables_integration_test.go @@ -3,7 +3,6 @@ package s3tables import ( "context" "fmt" - "net" "net/http" "os" "path/filepath" @@ -18,6 +17,7 @@ import ( "flag" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/weed/command" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" @@ -590,40 +590,12 @@ func testTargetOperations(t *testing.T, client *S3TablesClient) { // Helper functions -// findAvailablePorts finds n available ports by binding to port 0 multiple times -// It keeps the listeners open until all ports are found to ensure uniqueness -func findAvailablePorts(n int) ([]int, error) { - listeners := make([]*net.TCPListener, n) - ports := make([]int, n) - - // Open all listeners to ensure we get unique ports - for i := 0; i < n; i++ { - listener, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - // Close valid listeners before returning error - for j := 0; j < i; j++ { - listeners[j].Close() - } - return nil, err - } - listeners[i] = listener.(*net.TCPListener) - ports[i] = listeners[i].Addr().(*net.TCPAddr).Port - } - - // Close all listeners - for _, l := range listeners { - l.Close() - } - - return ports, nil -} - // startMiniClusterInDir starts a weed mini instance using testDir as the data // directory. It does not require a *testing.T so it can be called from TestMain. // extraArgs are appended to the default mini command flags. func startMiniClusterInDir(testDir string, extraArgs []string) (*TestCluster, error) { // We need 10 unique ports: Master(2), Volume(2), Filer(2), S3(2), Admin(2) - ports, err := findAvailablePorts(10) + ports, err := testutil.AllocatePorts(10) if err != nil { return nil, fmt.Errorf("failed to find available ports: %v", err) } diff --git a/test/s3tables/testutil/docker.go b/test/s3tables/testutil/docker.go deleted file mode 100644 index e8718922f..000000000 --- a/test/s3tables/testutil/docker.go +++ /dev/null @@ -1,92 +0,0 @@ -package testutil - -import ( - "context" - "fmt" - "net" - "net/http" - "os/exec" - "testing" - "time" -) - -func HasDocker() bool { - cmd := exec.Command("docker", "version") - return cmd.Run() == nil -} - -// MustFreePortPair is a convenience wrapper for tests that only need a single pair. -// Prefer MustAllocatePorts when allocating multiple pairs to guarantee uniqueness. -func MustFreePortPair(t *testing.T, name string) (int, int) { - ports := MustAllocatePorts(t, 2) - return ports[0], ports[1] -} - -// MustAllocatePorts allocates count unique free ports atomically. -// All listeners are held open until every port is obtained, preventing -// the OS from recycling a port between successive allocations. -func MustAllocatePorts(t *testing.T, count int) []int { - t.Helper() - ports, err := AllocatePorts(count) - if err != nil { - t.Fatalf("Failed to allocate %d free ports: %v", count, err) - } - return ports -} - -// AllocatePorts allocates count unique free ports atomically. -func AllocatePorts(count int) ([]int, error) { - listeners := make([]net.Listener, 0, count) - ports := make([]int, 0, count) - for i := 0; i < count; i++ { - l, err := net.Listen("tcp", "0.0.0.0:0") - if err != nil { - for _, ll := range listeners { - _ = ll.Close() - } - return nil, err - } - listeners = append(listeners, l) - ports = append(ports, l.Addr().(*net.TCPAddr).Port) - } - for _, l := range listeners { - _ = l.Close() - } - return ports, nil -} - -func WaitForService(url string, timeout time.Duration) bool { - client := &http.Client{Timeout: 2 * time.Second} - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return false - case <-ticker.C: - resp, err := client.Get(url) - if err == nil { - resp.Body.Close() - return true - } - } - } -} - -func WaitForPort(port int, timeout time.Duration) bool { - deadline := time.Now().Add(timeout) - address := fmt.Sprintf("127.0.0.1:%d", port) - for time.Now().Before(deadline) { - conn, err := net.DialTimeout("tcp", address, 500*time.Millisecond) - if err == nil { - _ = conn.Close() - return true - } - time.Sleep(100 * time.Millisecond) - } - return false -} diff --git a/test/s3tables/testutil/weed_mini.go b/test/testutil/helpers.go similarity index 51% rename from test/s3tables/testutil/weed_mini.go rename to test/testutil/helpers.go index 1c661b698..7272722ca 100644 --- a/test/s3tables/testutil/weed_mini.go +++ b/test/testutil/helpers.go @@ -1,17 +1,25 @@ package testutil import ( + "context" "fmt" - "math/rand" "net" + "net/http" "os" + "os/exec" "path/filepath" - "testing" "time" ) const SeaweedMiniStartupTimeout = 45 * time.Second +func HasDocker() bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + cmd := exec.CommandContext(ctx, "docker", "version") + return cmd.Run() == nil +} + func FindBindIP() string { addrs, err := net.InterfaceAddrs() if err != nil { @@ -60,34 +68,38 @@ func WriteIAMConfig(dir, accessKey, secretKey string) (string, error) { return iamConfigPath, nil } -func MustFreeMiniPort(t *testing.T, name string) int { - t.Helper() +func WaitForService(url string, timeout time.Duration) bool { + client := &http.Client{Timeout: 2 * time.Second} + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - const ( - minPort = 10000 - maxPort = 55000 - ) - r := rand.New(rand.NewSource(time.Now().UnixNano())) + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() - for i := 0; i < 1000; i++ { - port := minPort + r.Intn(maxPort-minPort) - - listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) - if err != nil { - continue + for { + select { + case <-ctx.Done(): + return false + case <-ticker.C: + resp, err := client.Get(url) + if err == nil { + resp.Body.Close() + return true + } } - _ = listener.Close() - - grpcPort := port + 10000 - grpcListener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort)) - if err != nil { - continue - } - _ = grpcListener.Close() - - return port } - - t.Fatalf("failed to get free weed mini port for %s", name) - return 0 +} + +func WaitForPort(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + address := fmt.Sprintf("127.0.0.1:%d", port) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", address, 500*time.Millisecond) + if err == nil { + _ = conn.Close() + return true + } + time.Sleep(100 * time.Millisecond) + } + return false } diff --git a/test/testutil/ports.go b/test/testutil/ports.go new file mode 100644 index 000000000..f0c35f11c --- /dev/null +++ b/test/testutil/ports.go @@ -0,0 +1,120 @@ +// Package testutil provides shared test utilities for SeaweedFS integration tests. +package testutil + +import ( + "fmt" + "math/rand" + "net" + "testing" +) + +// GrpcPortOffset is the offset weed mini uses to derive gRPC ports from HTTP ports. +const GrpcPortOffset = 10000 + +// AllocatePorts allocates count unique free ports atomically. +// All listeners are held open until every port is obtained, preventing +// the OS from recycling a port between successive allocations. +func AllocatePorts(count int) ([]int, error) { + listeners := make([]net.Listener, 0, count) + ports := make([]int, 0, count) + for i := 0; i < count; i++ { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + for _, ll := range listeners { + _ = ll.Close() + } + return nil, err + } + listeners = append(listeners, l) + ports = append(ports, l.Addr().(*net.TCPAddr).Port) + } + for _, l := range listeners { + _ = l.Close() + } + return ports, nil +} + +// MustAllocatePorts is a testing wrapper for AllocatePorts. +func MustAllocatePorts(t *testing.T, count int) []int { + t.Helper() + ports, err := AllocatePorts(count) + if err != nil { + t.Fatalf("Failed to allocate %d free ports: %v", count, err) + } + return ports +} + +// AllocateMiniPorts allocates n free ports where each port and its gRPC +// counterpart (port + GrpcPortOffset) are available and don't collide +// with any other allocated port or its gRPC counterpart. All listeners +// are held open until the entire batch is allocated, preventing the OS +// from recycling ports between allocations. Use this when ports will be +// passed to weed mini without explicit gRPC port flags, so mini will +// derive gRPC ports as HTTP + 10000. +func AllocateMiniPorts(count int) ([]int, error) { + const ( + minPort = 10000 + maxPort = 55000 + ) + reserved := make(map[int]bool) + ports := make([]int, 0, count) + var listeners []net.Listener + defer func() { + for _, l := range listeners { + l.Close() + } + }() + + for idx := 0; idx < count; idx++ { + found := false + for i := 0; i < 1000; i++ { + port := minPort + rand.Intn(maxPort-minPort) + grpcPort := port + GrpcPortOffset + + if reserved[port] || reserved[grpcPort] { + continue + } + + l1, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + continue + } + + l2, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", grpcPort)) + if err != nil { + l1.Close() + continue + } + + listeners = append(listeners, l1, l2) + reserved[port] = true + reserved[grpcPort] = true + ports = append(ports, port) + found = true + break + } + if !found { + return nil, fmt.Errorf("failed to allocate mini port %d of %d", idx+1, count) + } + } + + return ports, nil +} + +// MustFreeMiniPorts allocates n ports suitable for weed mini, ensuring +// each port's gRPC offset (port + 10000) doesn't collide with any other +// allocated port. names is used only for error messages. +func MustFreeMiniPorts(t *testing.T, names []string) []int { + t.Helper() + ports, err := AllocateMiniPorts(len(names)) + if err != nil { + t.Fatalf("failed to allocate mini ports for %v: %v", names, err) + } + return ports +} + +// MustFreeMiniPort allocates a single weed mini port. +func MustFreeMiniPort(t *testing.T, name string) int { + t.Helper() + return MustFreeMiniPorts(t, []string{name})[0] +} diff --git a/test/volume_server/framework/cluster.go b/test/volume_server/framework/cluster.go index 1f9d30740..c84ebdae2 100644 --- a/test/volume_server/framework/cluster.go +++ b/test/volume_server/framework/cluster.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" ) @@ -84,12 +85,14 @@ func StartSingleVolumeCluster(t testing.TB, profile matrix.Profile) *Cluster { t.Fatalf("write security config: %v", err) } - masterPort, masterGrpcPort, err := allocateMasterPortPair() + miniPorts, err := testutil.AllocateMiniPorts(1) if err != nil { t.Fatalf("allocate master port pair: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset - ports, err := allocatePorts(3) + ports, err := testutil.AllocatePorts(3) if err != nil { t.Fatalf("allocate ports: %v", err) } @@ -269,44 +272,6 @@ func stopProcess(cmd *exec.Cmd) { } } -func allocatePorts(count int) ([]int, error) { - listeners := make([]net.Listener, 0, count) - ports := make([]int, 0, count) - for i := 0; i < count; i++ { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - for _, ll := range listeners { - _ = ll.Close() - } - return nil, err - } - listeners = append(listeners, l) - ports = append(ports, l.Addr().(*net.TCPAddr).Port) - } - for _, l := range listeners { - _ = l.Close() - } - return ports, nil -} - -func allocateMasterPortPair() (int, int, error) { - for masterPort := 10000; masterPort <= 55535; masterPort++ { - masterGrpcPort := masterPort + 10000 - l1, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterPort))) - if err != nil { - continue - } - l2, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(masterGrpcPort))) - if err != nil { - _ = l1.Close() - continue - } - _ = l2.Close() - _ = l1.Close() - return masterPort, masterGrpcPort, nil - } - return 0, 0, errors.New("unable to find available master port pair") -} func newWorkDir() (dir string, keepLogs bool, err error) { keepLogs = os.Getenv("VOLUME_SERVER_IT_KEEP_LOGS") == "1" diff --git a/test/volume_server/framework/cluster_mixed.go b/test/volume_server/framework/cluster_mixed.go index 17376a842..46189c93b 100644 --- a/test/volume_server/framework/cluster_mixed.go +++ b/test/volume_server/framework/cluster_mixed.go @@ -10,6 +10,7 @@ import ( "sync" "testing" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" ) @@ -92,17 +93,19 @@ func StartMixedVolumeCluster(t testing.TB, profile matrix.Profile, goCount, rust t.Fatalf("write security config: %v", err) } - masterPort, masterGrpcPort, err := allocateMasterPortPair() + miniPorts, err := testutil.AllocateMiniPorts(1) if err != nil { t.Fatalf("allocate master port pair: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset // 2 ports per server (admin, grpc); add 1 more when public port is split out. portsPerServer := 2 if profile.SplitPublicPort { portsPerServer = 3 } - ports, err := allocatePorts(total * portsPerServer) + ports, err := testutil.AllocatePorts(total * portsPerServer) if err != nil { t.Fatalf("allocate volume ports: %v", err) } diff --git a/test/volume_server/framework/cluster_multi.go b/test/volume_server/framework/cluster_multi.go index 152f57f6d..84a82f067 100644 --- a/test/volume_server/framework/cluster_multi.go +++ b/test/volume_server/framework/cluster_multi.go @@ -10,6 +10,7 @@ import ( "sync" "testing" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" ) @@ -74,10 +75,12 @@ func StartMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount i t.Fatalf("write security config: %v", err) } - masterPort, masterGrpcPort, err := allocateMasterPortPair() + miniPorts, err := testutil.AllocateMiniPorts(1) if err != nil { t.Fatalf("allocate master port pair: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset // Allocate ports for all volume servers (3 ports per server: admin, grpc, public) // If SplitPublicPort is true, we need an additional port per server @@ -86,7 +89,7 @@ func StartMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCount i portsPerServer = 4 } totalPorts := serverCount * portsPerServer - ports, err := allocatePorts(totalPorts) + ports, err := testutil.AllocatePorts(totalPorts) if err != nil { t.Fatalf("allocate volume ports: %v", err) } diff --git a/test/volume_server/framework/cluster_multi_rust.go b/test/volume_server/framework/cluster_multi_rust.go index 45b9572ae..18030330a 100644 --- a/test/volume_server/framework/cluster_multi_rust.go +++ b/test/volume_server/framework/cluster_multi_rust.go @@ -10,6 +10,7 @@ import ( "sync" "testing" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" ) @@ -85,10 +86,12 @@ func StartRustMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCou t.Fatalf("write security config: %v", err) } - masterPort, masterGrpcPort, err := allocateMasterPortPair() + miniPorts, err := testutil.AllocateMiniPorts(1) if err != nil { t.Fatalf("allocate master port pair: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset // Allocate ports for all volume servers (3 ports per server: admin, grpc, public) // If SplitPublicPort is true, we need an additional port per server @@ -97,7 +100,7 @@ func StartRustMultiVolumeCluster(t testing.TB, profile matrix.Profile, serverCou portsPerServer = 4 } totalPorts := serverCount * portsPerServer - ports, err := allocatePorts(totalPorts) + ports, err := testutil.AllocatePorts(totalPorts) if err != nil { t.Fatalf("allocate volume ports: %v", err) } diff --git a/test/volume_server/framework/cluster_rust.go b/test/volume_server/framework/cluster_rust.go index 5d5f56a14..2c786c1f0 100644 --- a/test/volume_server/framework/cluster_rust.go +++ b/test/volume_server/framework/cluster_rust.go @@ -12,6 +12,7 @@ import ( "sync" "testing" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" ) @@ -79,12 +80,14 @@ func StartRustVolumeCluster(t testing.TB, profile matrix.Profile) *RustCluster { t.Fatalf("write security config: %v", err) } - masterPort, masterGrpcPort, err := allocateMasterPortPair() + miniPorts, err := testutil.AllocateMiniPorts(1) if err != nil { t.Fatalf("allocate master port pair: %v", err) } + masterPort := miniPorts[0] + masterGrpcPort := masterPort + testutil.GrpcPortOffset - ports, err := allocatePorts(3) + ports, err := testutil.AllocatePorts(3) if err != nil { t.Fatalf("allocate ports: %v", err) } diff --git a/test/volume_server/framework/cluster_with_filer.go b/test/volume_server/framework/cluster_with_filer.go index 67f35fd26..eaaa4a20f 100644 --- a/test/volume_server/framework/cluster_with_filer.go +++ b/test/volume_server/framework/cluster_with_filer.go @@ -9,6 +9,7 @@ import ( "strconv" "testing" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" ) @@ -25,7 +26,7 @@ func StartSingleVolumeClusterWithFiler(t testing.TB, profile matrix.Profile) *Cl baseCluster := StartSingleVolumeCluster(t, profile) - ports, err := allocatePorts(2) + ports, err := testutil.AllocatePorts(2) if err != nil { t.Fatalf("allocate filer ports: %v", err) } diff --git a/test/volume_server/grpc/fetch_remote_s3_test.go b/test/volume_server/grpc/fetch_remote_s3_test.go index bd1c94cbc..d3c9aa673 100644 --- a/test/volume_server/grpc/fetch_remote_s3_test.go +++ b/test/volume_server/grpc/fetch_remote_s3_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "net" "os" "os/exec" "path/filepath" @@ -17,6 +16,7 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/framework" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" @@ -24,29 +24,6 @@ import ( ) // findAvailablePort finds a free TCP port on localhost. -func findAvailablePort() (int, error) { - l, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return 0, err - } - port := l.Addr().(*net.TCPAddr).Port - l.Close() - return port, nil -} - -// waitForPort waits until a TCP port is listening, up to timeout. -func waitForPort(port int, timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for time.Now().Before(deadline) { - conn, err := net.DialTimeout("tcp", fmt.Sprintf("127.0.0.1:%d", port), 500*time.Millisecond) - if err == nil { - conn.Close() - return nil - } - time.Sleep(200 * time.Millisecond) - } - return fmt.Errorf("port %d not listening after %v", port, timeout) -} // startWeedMini starts a weed mini subprocess and returns the S3 endpoint and cleanup func. func startWeedMini(t *testing.T) (s3Endpoint string, cleanup func()) { @@ -60,10 +37,11 @@ func startWeedMini(t *testing.T) (s3Endpoint string, cleanup func()) { } } - miniMasterPort, _ := findAvailablePort() - miniVolumePort, _ := findAvailablePort() - miniFilerPort, _ := findAvailablePort() - miniS3Port, _ := findAvailablePort() + miniPorts := testutil.MustFreeMiniPorts(t, []string{"Master", "Volume", "Filer", "S3"}) + miniMasterPort := miniPorts[0] + miniVolumePort := miniPorts[1] + miniFilerPort := miniPorts[2] + miniS3Port := miniPorts[3] miniDir := t.TempDir() os.WriteFile(filepath.Join(miniDir, "security.toml"), []byte("# empty\n"), 0644) @@ -89,11 +67,11 @@ func startWeedMini(t *testing.T) (s3Endpoint string, cleanup func()) { t.Fatalf("start weed mini: %v", err) } - if err := waitForPort(miniS3Port, 30*time.Second); err != nil { + if !testutil.WaitForPort(miniS3Port, 30*time.Second) { cancel() miniCmd.Wait() logFile.Close() - t.Fatalf("weed mini S3 not ready: %v", err) + t.Fatalf("weed mini S3 not ready on port %d", miniS3Port) } t.Logf("weed mini S3 ready on port %d", miniS3Port)