package framework import ( "bufio" "bytes" "errors" "fmt" "io" "net" "net/http" "os" "os/exec" "path/filepath" "runtime" "strconv" "strings" "sync" "testing" "time" "github.com/seaweedfs/seaweedfs/test/testutil" "github.com/seaweedfs/seaweedfs/test/volume_server/matrix" ) const ( defaultWaitTimeout = 30 * time.Second defaultWaitTick = 200 * time.Millisecond testVolumeSizeLimitMB = 32 ) var ( weedBinaryOnce sync.Once weedBinaryPath string weedBinaryErr error ) // Cluster is a lightweight SeaweedFS master + one volume server test harness. type Cluster struct { testingTB testing.TB profile matrix.Profile weedBinary string baseDir string configDir string logsDir string keepLogs bool masterPort int masterGrpcPort int volumePort int volumeGrpcPort int volumePubPort int masterCmd *exec.Cmd volumeCmd *exec.Cmd volumeDataDirs []string cleanupOnce sync.Once } // StartSingleVolumeCluster boots one master and one volume server. func StartSingleVolumeCluster(t testing.TB, profile matrix.Profile) *Cluster { return StartSingleVolumeClusterWithDataDirs(t, profile, 1) } // StartSingleVolumeClusterWithDataDirs boots one master and one volume server // with dataDirCount separate data directories (passed to -dir as a comma list). // Each directory becomes its own DiskLocation on the volume server, letting // tests exercise multi-disk EC placement paths. func StartSingleVolumeClusterWithDataDirs(t testing.TB, profile matrix.Profile, dataDirCount int) *Cluster { t.Helper() if dataDirCount < 1 { t.Fatalf("dataDirCount must be >= 1, got %d", dataDirCount) } weedBinary, err := FindOrBuildWeedBinary() if err != nil { t.Fatalf("resolve weed binary: %v", err) } baseDir, keepLogs, err := newWorkDir() if err != nil { t.Fatalf("create temp test directory: %v", err) } configDir := filepath.Join(baseDir, "config") logsDir := filepath.Join(baseDir, "logs") masterDataDir := filepath.Join(baseDir, "master") volumeDataDirs := make([]string, dataDirCount) // Single-dir layout stays at baseDir/volume so existing fixtures // (CorruptDatFile etc.) that hardcode that path keep working. Only // multi-dir clusters get the "volumeN" layout. for i := 0; i < dataDirCount; i++ { if dataDirCount == 1 { volumeDataDirs[i] = filepath.Join(baseDir, "volume") } else { volumeDataDirs[i] = filepath.Join(baseDir, fmt.Sprintf("volume%d", i)) } } setupDirs := append([]string{configDir, logsDir, masterDataDir}, volumeDataDirs...) for _, dir := range setupDirs { if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil { t.Fatalf("create %s: %v", dir, mkErr) } } if err = writeSecurityConfig(configDir, profile); err != nil { t.Fatalf("write security config: %v", err) } miniPorts, ports, err := testutil.AllocatePortSet(1, 3) if err != nil { t.Fatalf("allocate ports: %v", err) } masterPort := miniPorts[0] masterGrpcPort := masterPort + testutil.GrpcPortOffset c := &Cluster{ testingTB: t, profile: profile, weedBinary: weedBinary, baseDir: baseDir, configDir: configDir, logsDir: logsDir, keepLogs: keepLogs, masterPort: masterPort, masterGrpcPort: masterGrpcPort, volumePort: ports[0], volumeGrpcPort: ports[1], volumePubPort: ports[0], } if profile.SplitPublicPort { c.volumePubPort = ports[2] } if err = c.startMaster(masterDataDir); err != nil { c.Stop() t.Fatalf("start master: %v", err) } if err = c.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil { masterLog := c.tailLog("master.log") c.Stop() t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog) } if err = c.startVolume(volumeDataDirs); err != nil { masterLog := c.tailLog("master.log") c.Stop() t.Fatalf("start volume: %v\nmaster log tail:\n%s", err, masterLog) } c.volumeDataDirs = volumeDataDirs if err = c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil { volumeLog := c.tailLog("volume.log") c.Stop() t.Fatalf("wait for volume readiness: %v\nvolume log tail:\n%s", err, volumeLog) } if err = c.waitForTCP(c.VolumeGRPCAddress()); err != nil { volumeLog := c.tailLog("volume.log") c.Stop() t.Fatalf("wait for volume grpc readiness: %v\nvolume log tail:\n%s", err, volumeLog) } t.Cleanup(func() { c.Stop() }) return c } // Stop terminates all processes and cleans temporary files. func (c *Cluster) Stop() { if c == nil { return } c.cleanupOnce.Do(func() { stopProcess(c.volumeCmd) stopProcess(c.masterCmd) if !c.keepLogs && !c.testingTB.Failed() { _ = os.RemoveAll(c.baseDir) } else if c.baseDir != "" { c.testingTB.Logf("volume server integration logs kept at %s", c.baseDir) } }) } // RestartVolumeServer kills the volume server and starts a new one against // the same data dirs and ports. The master keeps running. The previous run's // volume.log is moved to volume.log.previous so the first run's logs survive // a second-run startup failure. func (c *Cluster) RestartVolumeServer() { c.testingTB.Helper() stopProcess(c.volumeCmd) c.volumeCmd = nil // Rotate the log; absent on the first call after a clean start, which // is fine — startVolume will create it. Any real filesystem failure // surfaces immediately on the next os.Create in startVolume. oldLog := filepath.Join(c.logsDir, "volume.log") _ = os.Rename(oldLog, filepath.Join(c.logsDir, "volume.log.previous")) if err := c.startVolume(c.volumeDataDirs); err != nil { c.testingTB.Fatalf("restart volume server: %v", err) } if err := c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil { c.testingTB.Fatalf("wait for volume admin readiness after restart: %v\nvolume log tail:\n%s", err, c.tailLog("volume.log")) } if err := c.waitForTCP(c.VolumeGRPCAddress()); err != nil { c.testingTB.Fatalf("wait for volume grpc readiness after restart: %v\nvolume log tail:\n%s", err, c.tailLog("volume.log")) } } // StopVolumeServer kills the volume server but leaves the master and data // dirs alone. Pair with RestartVolumeServer or Stop. func (c *Cluster) StopVolumeServer() { c.testingTB.Helper() stopProcess(c.volumeCmd) c.volumeCmd = nil } func (c *Cluster) startMaster(dataDir string) error { logFile, err := os.Create(filepath.Join(c.logsDir, "master.log")) if err != nil { return err } args := []string{ "-config_dir=" + c.configDir, "master", "-ip=127.0.0.1", "-port=" + strconv.Itoa(c.masterPort), "-port.grpc=" + strconv.Itoa(c.masterGrpcPort), "-mdir=" + dataDir, "-peers=none", "-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB), "-defaultReplication=000", } c.masterCmd = exec.Command(c.weedBinary, args...) c.masterCmd.Dir = c.baseDir c.masterCmd.Stdout = logFile c.masterCmd.Stderr = logFile return c.masterCmd.Start() } func (c *Cluster) startVolume(dataDirs []string) error { logFile, err := os.Create(filepath.Join(c.logsDir, "volume.log")) if err != nil { return err } maxPerDir := make([]string, len(dataDirs)) for i := range dataDirs { maxPerDir[i] = "16" } args := []string{ "-config_dir=" + c.configDir, "volume", "-ip=127.0.0.1", "-port=" + strconv.Itoa(c.volumePort), "-port.grpc=" + strconv.Itoa(c.volumeGrpcPort), "-port.public=" + strconv.Itoa(c.volumePubPort), "-dir=" + strings.Join(dataDirs, ","), "-max=" + strings.Join(maxPerDir, ","), "-master=127.0.0.1:" + strconv.Itoa(c.masterPort), "-readMode=" + c.profile.ReadMode, "-concurrentUploadLimitMB=" + strconv.Itoa(c.profile.ConcurrentUploadLimitMB), "-concurrentDownloadLimitMB=" + strconv.Itoa(c.profile.ConcurrentDownloadLimitMB), // Integration tests deliberately exercise loopback S3 endpoints // (the test rig boots weed-mini next to the volume server); allow // the SSRF guard to be bypassed for them. "-volume.allowUntrustedRemoteEndpoints", } if c.profile.InflightUploadTimeout > 0 { args = append(args, "-inflightUploadDataTimeout="+c.profile.InflightUploadTimeout.String()) } if c.profile.InflightDownloadTimeout > 0 { args = append(args, "-inflightDownloadDataTimeout="+c.profile.InflightDownloadTimeout.String()) } c.volumeCmd = exec.Command(c.weedBinary, args...) c.volumeCmd.Dir = c.baseDir c.volumeCmd.Stdout = logFile c.volumeCmd.Stderr = logFile return c.volumeCmd.Start() } func (c *Cluster) waitForHTTP(url string) error { client := &http.Client{Timeout: 1 * time.Second} deadline := time.Now().Add(defaultWaitTimeout) for time.Now().Before(deadline) { resp, err := client.Get(url) if err == nil { _, _ = io.Copy(io.Discard, resp.Body) resp.Body.Close() if resp.StatusCode < 500 { return nil } } time.Sleep(defaultWaitTick) } return fmt.Errorf("timed out waiting for %s", url) } func (c *Cluster) waitForTCP(addr string) error { deadline := time.Now().Add(defaultWaitTimeout) for time.Now().Before(deadline) { conn, err := net.DialTimeout("tcp", addr, time.Second) if err == nil { _ = conn.Close() return nil } time.Sleep(defaultWaitTick) } return fmt.Errorf("timed out waiting for tcp %s", addr) } func stopProcess(cmd *exec.Cmd) { if cmd == nil || cmd.Process == nil { return } _ = cmd.Process.Signal(os.Interrupt) done := make(chan error, 1) go func() { done <- cmd.Wait() }() select { case <-time.After(10 * time.Second): _ = cmd.Process.Kill() <-done case <-done: } } func newWorkDir() (dir string, keepLogs bool, err error) { keepLogs = os.Getenv("VOLUME_SERVER_IT_KEEP_LOGS") == "1" dir, err = os.MkdirTemp("", "seaweedfs_volume_server_it_") return dir, keepLogs, err } func writeSecurityConfig(configDir string, profile matrix.Profile) error { var b strings.Builder if profile.EnableJWT { if profile.JWTSigningKey == "" || profile.JWTReadKey == "" { return errors.New("jwt profile requires both write and read keys") } b.WriteString("[jwt.signing]\n") b.WriteString("key = \"") b.WriteString(profile.JWTSigningKey) b.WriteString("\"\n") b.WriteString("expires_after_seconds = 60\n\n") b.WriteString("[jwt.signing.read]\n") b.WriteString("key = \"") b.WriteString(profile.JWTReadKey) b.WriteString("\"\n") b.WriteString("expires_after_seconds = 60\n") } if profile.EnableUIAccess { if b.Len() > 0 { b.WriteString("\n") } b.WriteString("[access]\n") b.WriteString("ui = true\n") } if b.Len() == 0 { b.WriteString("# optional security config generated for integration tests\n") } return os.WriteFile(filepath.Join(configDir, "security.toml"), []byte(b.String()), 0o644) } // FindOrBuildWeedBinary returns an executable weed binary, building one when needed. func FindOrBuildWeedBinary() (string, error) { if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" { if isExecutableFile(fromEnv) { return fromEnv, nil } return "", fmt.Errorf("WEED_BINARY is set but not executable: %s", fromEnv) } weedBinaryOnce.Do(func() { repoRoot := "" if _, file, _, ok := runtime.Caller(0); ok { repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..", "..")) } if repoRoot == "" { weedBinaryErr = errors.New("unable to detect repository root") return } binDir := filepath.Join(os.TempDir(), "seaweedfs_volume_server_it_bin") if err := os.MkdirAll(binDir, 0o755); err != nil { weedBinaryErr = fmt.Errorf("create binary directory %s: %w", binDir, err) return } binPath := filepath.Join(binDir, "weed") cmd := exec.Command("go", "build", "-o", binPath, ".") cmd.Dir = filepath.Join(repoRoot, "weed") var out bytes.Buffer cmd.Stdout = &out cmd.Stderr = &out if err := cmd.Run(); err != nil { weedBinaryErr = fmt.Errorf("build weed binary: %w\n%s", err, out.String()) return } if !isExecutableFile(binPath) { weedBinaryErr = fmt.Errorf("built weed binary is not executable: %s", binPath) return } weedBinaryPath = binPath }) if weedBinaryErr != nil { return "", weedBinaryErr } return weedBinaryPath, nil } func isExecutableFile(path string) bool { info, err := os.Stat(path) if err != nil || info.IsDir() { return false } mode := info.Mode().Perm() return mode&0o111 != 0 } func (c *Cluster) tailLog(logName string) string { f, err := os.Open(filepath.Join(c.logsDir, logName)) if err != nil { return "" } defer f.Close() scanner := bufio.NewScanner(f) lines := make([]string, 0, 40) for scanner.Scan() { lines = append(lines, scanner.Text()) if len(lines) > 40 { lines = lines[1:] } } return strings.Join(lines, "\n") } func (c *Cluster) MasterAddress() string { return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort)) } func (c *Cluster) VolumeAdminAddress() string { return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePort)) } func (c *Cluster) VolumePublicAddress() string { return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPort)) } func (c *Cluster) VolumeGRPCAddress() string { return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPort)) } // VolumeServerAddress returns SeaweedFS server address format: ip:httpPort.grpcPort func (c *Cluster) VolumeServerAddress() string { return fmt.Sprintf("%s.%d", c.VolumeAdminAddress(), c.volumeGrpcPort) } func (c *Cluster) MasterURL() string { return "http://" + c.MasterAddress() } func (c *Cluster) VolumeAdminURL() string { return "http://" + c.VolumeAdminAddress() } func (c *Cluster) VolumePublicURL() string { return "http://" + c.VolumePublicAddress() } func (c *Cluster) BaseDir() string { return c.baseDir } // VolumeDataDirs returns the data directories the volume server was started with. // Index 0 corresponds to DiskLocation 0, index 1 to DiskLocation 1, and so on. // Tests can scan these directories to verify where files physically landed. func (c *Cluster) VolumeDataDirs() []string { return append([]string(nil), c.volumeDataDirs...) }