mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 13:51:33 +00:00
* fix(master): include GrpcPort in LookupEcVolume response LookupVolume already passes loc.GrpcPort through to the client; LookupEcVolume builds Location with only Url / PublicUrl / DataCenter, so callers fall back to ServerToGrpcAddress (httpPort + 10000). On any deployment where that convention does not hold — multi-disk integration tests, custom port layouts — EC reads dial the wrong port and quietly degrade to parity recovery. * fix(volume/ec): probe every DiskLocation when serving local shard reads reconcileEcShardsAcrossDisks (issue 9212) registers each .ec?? against the DiskLocation that physically owns it, so a multi-disk volume server can hold shards for the same vid in two separate ecVolumes — one per disk — with .ecx on whichever disk owned the original .dat. The read path only consulted the single EcVolume FindEcVolume picked, so requests for shards on the sibling disk fell through to errShardNotLocal and then to remote/loopback recovery. Walk all DiskLocations after the first probe in both readLocalEcShardInterval and the VolumeEcShardRead gRPC handler; the latter also covers the loopback that recoverOneRemoteEcShardInterval falls back to when a peer dial fails. * test(volume/ec): cover the multi-disk EC lifecycle end-to-end Two integration tests against a real volume server with two data dirs: TestEcLifecycleAcrossMultipleDisks drives encode -> mount -> HTTP read -> drop .dat -> stop -> redistribute shards across disks -> restart -> verify reconcileEcShardsAcrossDisks attached the orphan shards and reads still work -> blob delete -> stop -> drop a shard -> restart -> VolumeEcShardsRebuild pulls input from both disks -> reads still work. TestEcPartialShardsOnSiblingDiskCleanedUpOnRestart is the issue 9478 reproducer at the cluster level: seed a healthy .dat on disk 0, plant the on-disk footprint of an interrupted EC encode on disk 1, restart, and assert pruneIncompleteEcWithSiblingDat wipes disk 1 without touching disk 0. Framework gets RestartVolumeServer / StopVolumeServer helpers; the previous run's volume.log is rotated to volume.log.previous so a startup regression on the second run does not lose the first run's diagnostics. * review: trim verbose comments * review: drop racy fast-path, use locked findEcShard directly gemini-code-assist flagged the two-step lookup in readLocalEcShardInterval and VolumeEcShardRead: the first probe (ecVolume.FindEcVolumeShard) reads the EcVolume's Shards slice without holding ecVolumesLock, so a concurrent mount / unmount could race with it. findEcShard already walks every DiskLocation under the right lock, so the fast-path adds nothing but the race. Collapse both call sites to a single locked call. Also note in RestartVolumeServer why the log-rotation error is swallowed: absence on first call is benign; anything else surfaces in the next os.Create in startVolume.
490 lines
14 KiB
Go
490 lines
14 KiB
Go
package framework
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/test/testutil"
|
|
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
|
|
)
|
|
|
|
const (
|
|
defaultWaitTimeout = 30 * time.Second
|
|
defaultWaitTick = 200 * time.Millisecond
|
|
testVolumeSizeLimitMB = 32
|
|
)
|
|
|
|
var (
|
|
weedBinaryOnce sync.Once
|
|
weedBinaryPath string
|
|
weedBinaryErr error
|
|
)
|
|
|
|
// Cluster is a lightweight SeaweedFS master + one volume server test harness.
|
|
type Cluster struct {
|
|
testingTB testing.TB
|
|
profile matrix.Profile
|
|
|
|
weedBinary string
|
|
baseDir string
|
|
configDir string
|
|
logsDir string
|
|
keepLogs bool
|
|
|
|
masterPort int
|
|
masterGrpcPort int
|
|
volumePort int
|
|
volumeGrpcPort int
|
|
volumePubPort int
|
|
|
|
masterCmd *exec.Cmd
|
|
volumeCmd *exec.Cmd
|
|
|
|
volumeDataDirs []string
|
|
|
|
cleanupOnce sync.Once
|
|
}
|
|
|
|
// StartSingleVolumeCluster boots one master and one volume server.
|
|
func StartSingleVolumeCluster(t testing.TB, profile matrix.Profile) *Cluster {
|
|
return StartSingleVolumeClusterWithDataDirs(t, profile, 1)
|
|
}
|
|
|
|
// StartSingleVolumeClusterWithDataDirs boots one master and one volume server
|
|
// with dataDirCount separate data directories (passed to -dir as a comma list).
|
|
// Each directory becomes its own DiskLocation on the volume server, letting
|
|
// tests exercise multi-disk EC placement paths.
|
|
func StartSingleVolumeClusterWithDataDirs(t testing.TB, profile matrix.Profile, dataDirCount int) *Cluster {
|
|
t.Helper()
|
|
if dataDirCount < 1 {
|
|
t.Fatalf("dataDirCount must be >= 1, got %d", dataDirCount)
|
|
}
|
|
|
|
weedBinary, err := FindOrBuildWeedBinary()
|
|
if err != nil {
|
|
t.Fatalf("resolve weed binary: %v", err)
|
|
}
|
|
|
|
baseDir, keepLogs, err := newWorkDir()
|
|
if err != nil {
|
|
t.Fatalf("create temp test directory: %v", err)
|
|
}
|
|
|
|
configDir := filepath.Join(baseDir, "config")
|
|
logsDir := filepath.Join(baseDir, "logs")
|
|
masterDataDir := filepath.Join(baseDir, "master")
|
|
volumeDataDirs := make([]string, dataDirCount)
|
|
// Single-dir layout stays at baseDir/volume so existing fixtures
|
|
// (CorruptDatFile etc.) that hardcode that path keep working. Only
|
|
// multi-dir clusters get the "volumeN" layout.
|
|
for i := 0; i < dataDirCount; i++ {
|
|
if dataDirCount == 1 {
|
|
volumeDataDirs[i] = filepath.Join(baseDir, "volume")
|
|
} else {
|
|
volumeDataDirs[i] = filepath.Join(baseDir, fmt.Sprintf("volume%d", i))
|
|
}
|
|
}
|
|
setupDirs := append([]string{configDir, logsDir, masterDataDir}, volumeDataDirs...)
|
|
for _, dir := range setupDirs {
|
|
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
|
|
t.Fatalf("create %s: %v", dir, mkErr)
|
|
}
|
|
}
|
|
|
|
if err = writeSecurityConfig(configDir, profile); err != nil {
|
|
t.Fatalf("write security config: %v", err)
|
|
}
|
|
|
|
miniPorts, ports, err := testutil.AllocatePortSet(1, 3)
|
|
if err != nil {
|
|
t.Fatalf("allocate ports: %v", err)
|
|
}
|
|
masterPort := miniPorts[0]
|
|
masterGrpcPort := masterPort + testutil.GrpcPortOffset
|
|
|
|
c := &Cluster{
|
|
testingTB: t,
|
|
profile: profile,
|
|
weedBinary: weedBinary,
|
|
baseDir: baseDir,
|
|
configDir: configDir,
|
|
logsDir: logsDir,
|
|
keepLogs: keepLogs,
|
|
masterPort: masterPort,
|
|
masterGrpcPort: masterGrpcPort,
|
|
volumePort: ports[0],
|
|
volumeGrpcPort: ports[1],
|
|
volumePubPort: ports[0],
|
|
}
|
|
if profile.SplitPublicPort {
|
|
c.volumePubPort = ports[2]
|
|
}
|
|
|
|
if err = c.startMaster(masterDataDir); err != nil {
|
|
c.Stop()
|
|
t.Fatalf("start master: %v", err)
|
|
}
|
|
if err = c.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil {
|
|
masterLog := c.tailLog("master.log")
|
|
c.Stop()
|
|
t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog)
|
|
}
|
|
|
|
if err = c.startVolume(volumeDataDirs); err != nil {
|
|
masterLog := c.tailLog("master.log")
|
|
c.Stop()
|
|
t.Fatalf("start volume: %v\nmaster log tail:\n%s", err, masterLog)
|
|
}
|
|
c.volumeDataDirs = volumeDataDirs
|
|
if err = c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil {
|
|
volumeLog := c.tailLog("volume.log")
|
|
c.Stop()
|
|
t.Fatalf("wait for volume readiness: %v\nvolume log tail:\n%s", err, volumeLog)
|
|
}
|
|
if err = c.waitForTCP(c.VolumeGRPCAddress()); err != nil {
|
|
volumeLog := c.tailLog("volume.log")
|
|
c.Stop()
|
|
t.Fatalf("wait for volume grpc readiness: %v\nvolume log tail:\n%s", err, volumeLog)
|
|
}
|
|
|
|
t.Cleanup(func() {
|
|
c.Stop()
|
|
})
|
|
|
|
return c
|
|
}
|
|
|
|
// Stop terminates all processes and cleans temporary files.
|
|
func (c *Cluster) Stop() {
|
|
if c == nil {
|
|
return
|
|
}
|
|
c.cleanupOnce.Do(func() {
|
|
stopProcess(c.volumeCmd)
|
|
stopProcess(c.masterCmd)
|
|
if !c.keepLogs && !c.testingTB.Failed() {
|
|
_ = os.RemoveAll(c.baseDir)
|
|
} else if c.baseDir != "" {
|
|
c.testingTB.Logf("volume server integration logs kept at %s", c.baseDir)
|
|
}
|
|
})
|
|
}
|
|
|
|
// RestartVolumeServer kills the volume server and starts a new one against
|
|
// the same data dirs and ports. The master keeps running. The previous run's
|
|
// volume.log is moved to volume.log.previous so the first run's logs survive
|
|
// a second-run startup failure.
|
|
func (c *Cluster) RestartVolumeServer() {
|
|
c.testingTB.Helper()
|
|
stopProcess(c.volumeCmd)
|
|
c.volumeCmd = nil
|
|
// Rotate the log; absent on the first call after a clean start, which
|
|
// is fine — startVolume will create it. Any real filesystem failure
|
|
// surfaces immediately on the next os.Create in startVolume.
|
|
oldLog := filepath.Join(c.logsDir, "volume.log")
|
|
_ = os.Rename(oldLog, filepath.Join(c.logsDir, "volume.log.previous"))
|
|
if err := c.startVolume(c.volumeDataDirs); err != nil {
|
|
c.testingTB.Fatalf("restart volume server: %v", err)
|
|
}
|
|
if err := c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil {
|
|
c.testingTB.Fatalf("wait for volume admin readiness after restart: %v\nvolume log tail:\n%s", err, c.tailLog("volume.log"))
|
|
}
|
|
if err := c.waitForTCP(c.VolumeGRPCAddress()); err != nil {
|
|
c.testingTB.Fatalf("wait for volume grpc readiness after restart: %v\nvolume log tail:\n%s", err, c.tailLog("volume.log"))
|
|
}
|
|
}
|
|
|
|
// StopVolumeServer kills the volume server but leaves the master and data
|
|
// dirs alone. Pair with RestartVolumeServer or Stop.
|
|
func (c *Cluster) StopVolumeServer() {
|
|
c.testingTB.Helper()
|
|
stopProcess(c.volumeCmd)
|
|
c.volumeCmd = nil
|
|
}
|
|
|
|
func (c *Cluster) startMaster(dataDir string) error {
|
|
logFile, err := os.Create(filepath.Join(c.logsDir, "master.log"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
args := []string{
|
|
"-config_dir=" + c.configDir,
|
|
"master",
|
|
"-ip=127.0.0.1",
|
|
"-port=" + strconv.Itoa(c.masterPort),
|
|
"-port.grpc=" + strconv.Itoa(c.masterGrpcPort),
|
|
"-mdir=" + dataDir,
|
|
"-peers=none",
|
|
"-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB),
|
|
"-defaultReplication=000",
|
|
}
|
|
|
|
c.masterCmd = exec.Command(c.weedBinary, args...)
|
|
c.masterCmd.Dir = c.baseDir
|
|
c.masterCmd.Stdout = logFile
|
|
c.masterCmd.Stderr = logFile
|
|
return c.masterCmd.Start()
|
|
}
|
|
|
|
func (c *Cluster) startVolume(dataDirs []string) error {
|
|
logFile, err := os.Create(filepath.Join(c.logsDir, "volume.log"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
maxPerDir := make([]string, len(dataDirs))
|
|
for i := range dataDirs {
|
|
maxPerDir[i] = "16"
|
|
}
|
|
args := []string{
|
|
"-config_dir=" + c.configDir,
|
|
"volume",
|
|
"-ip=127.0.0.1",
|
|
"-port=" + strconv.Itoa(c.volumePort),
|
|
"-port.grpc=" + strconv.Itoa(c.volumeGrpcPort),
|
|
"-port.public=" + strconv.Itoa(c.volumePubPort),
|
|
"-dir=" + strings.Join(dataDirs, ","),
|
|
"-max=" + strings.Join(maxPerDir, ","),
|
|
"-master=127.0.0.1:" + strconv.Itoa(c.masterPort),
|
|
"-readMode=" + c.profile.ReadMode,
|
|
"-concurrentUploadLimitMB=" + strconv.Itoa(c.profile.ConcurrentUploadLimitMB),
|
|
"-concurrentDownloadLimitMB=" + strconv.Itoa(c.profile.ConcurrentDownloadLimitMB),
|
|
// Integration tests deliberately exercise loopback S3 endpoints
|
|
// (the test rig boots weed-mini next to the volume server); allow
|
|
// the SSRF guard to be bypassed for them.
|
|
"-volume.allowUntrustedRemoteEndpoints",
|
|
}
|
|
if c.profile.InflightUploadTimeout > 0 {
|
|
args = append(args, "-inflightUploadDataTimeout="+c.profile.InflightUploadTimeout.String())
|
|
}
|
|
if c.profile.InflightDownloadTimeout > 0 {
|
|
args = append(args, "-inflightDownloadDataTimeout="+c.profile.InflightDownloadTimeout.String())
|
|
}
|
|
|
|
c.volumeCmd = exec.Command(c.weedBinary, args...)
|
|
c.volumeCmd.Dir = c.baseDir
|
|
c.volumeCmd.Stdout = logFile
|
|
c.volumeCmd.Stderr = logFile
|
|
return c.volumeCmd.Start()
|
|
}
|
|
|
|
func (c *Cluster) waitForHTTP(url string) error {
|
|
client := &http.Client{Timeout: 1 * time.Second}
|
|
deadline := time.Now().Add(defaultWaitTimeout)
|
|
for time.Now().Before(deadline) {
|
|
resp, err := client.Get(url)
|
|
if err == nil {
|
|
_, _ = io.Copy(io.Discard, resp.Body)
|
|
resp.Body.Close()
|
|
if resp.StatusCode < 500 {
|
|
return nil
|
|
}
|
|
}
|
|
time.Sleep(defaultWaitTick)
|
|
}
|
|
return fmt.Errorf("timed out waiting for %s", url)
|
|
}
|
|
|
|
func (c *Cluster) waitForTCP(addr string) error {
|
|
deadline := time.Now().Add(defaultWaitTimeout)
|
|
for time.Now().Before(deadline) {
|
|
conn, err := net.DialTimeout("tcp", addr, time.Second)
|
|
if err == nil {
|
|
_ = conn.Close()
|
|
return nil
|
|
}
|
|
time.Sleep(defaultWaitTick)
|
|
}
|
|
return fmt.Errorf("timed out waiting for tcp %s", addr)
|
|
}
|
|
|
|
func stopProcess(cmd *exec.Cmd) {
|
|
if cmd == nil || cmd.Process == nil {
|
|
return
|
|
}
|
|
|
|
_ = cmd.Process.Signal(os.Interrupt)
|
|
done := make(chan error, 1)
|
|
go func() {
|
|
done <- cmd.Wait()
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(10 * time.Second):
|
|
_ = cmd.Process.Kill()
|
|
<-done
|
|
case <-done:
|
|
}
|
|
}
|
|
|
|
func newWorkDir() (dir string, keepLogs bool, err error) {
|
|
keepLogs = os.Getenv("VOLUME_SERVER_IT_KEEP_LOGS") == "1"
|
|
dir, err = os.MkdirTemp("", "seaweedfs_volume_server_it_")
|
|
return dir, keepLogs, err
|
|
}
|
|
|
|
func writeSecurityConfig(configDir string, profile matrix.Profile) error {
|
|
var b strings.Builder
|
|
if profile.EnableJWT {
|
|
if profile.JWTSigningKey == "" || profile.JWTReadKey == "" {
|
|
return errors.New("jwt profile requires both write and read keys")
|
|
}
|
|
b.WriteString("[jwt.signing]\n")
|
|
b.WriteString("key = \"")
|
|
b.WriteString(profile.JWTSigningKey)
|
|
b.WriteString("\"\n")
|
|
b.WriteString("expires_after_seconds = 60\n\n")
|
|
|
|
b.WriteString("[jwt.signing.read]\n")
|
|
b.WriteString("key = \"")
|
|
b.WriteString(profile.JWTReadKey)
|
|
b.WriteString("\"\n")
|
|
b.WriteString("expires_after_seconds = 60\n")
|
|
}
|
|
if profile.EnableUIAccess {
|
|
if b.Len() > 0 {
|
|
b.WriteString("\n")
|
|
}
|
|
b.WriteString("[access]\n")
|
|
b.WriteString("ui = true\n")
|
|
}
|
|
if b.Len() == 0 {
|
|
b.WriteString("# optional security config generated for integration tests\n")
|
|
}
|
|
return os.WriteFile(filepath.Join(configDir, "security.toml"), []byte(b.String()), 0o644)
|
|
}
|
|
|
|
// FindOrBuildWeedBinary returns an executable weed binary, building one when needed.
|
|
func FindOrBuildWeedBinary() (string, error) {
|
|
if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" {
|
|
if isExecutableFile(fromEnv) {
|
|
return fromEnv, nil
|
|
}
|
|
return "", fmt.Errorf("WEED_BINARY is set but not executable: %s", fromEnv)
|
|
}
|
|
|
|
weedBinaryOnce.Do(func() {
|
|
repoRoot := ""
|
|
if _, file, _, ok := runtime.Caller(0); ok {
|
|
repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..", ".."))
|
|
}
|
|
if repoRoot == "" {
|
|
weedBinaryErr = errors.New("unable to detect repository root")
|
|
return
|
|
}
|
|
|
|
binDir := filepath.Join(os.TempDir(), "seaweedfs_volume_server_it_bin")
|
|
if err := os.MkdirAll(binDir, 0o755); err != nil {
|
|
weedBinaryErr = fmt.Errorf("create binary directory %s: %w", binDir, err)
|
|
return
|
|
}
|
|
binPath := filepath.Join(binDir, "weed")
|
|
|
|
cmd := exec.Command("go", "build", "-o", binPath, ".")
|
|
cmd.Dir = filepath.Join(repoRoot, "weed")
|
|
var out bytes.Buffer
|
|
cmd.Stdout = &out
|
|
cmd.Stderr = &out
|
|
if err := cmd.Run(); err != nil {
|
|
weedBinaryErr = fmt.Errorf("build weed binary: %w\n%s", err, out.String())
|
|
return
|
|
}
|
|
if !isExecutableFile(binPath) {
|
|
weedBinaryErr = fmt.Errorf("built weed binary is not executable: %s", binPath)
|
|
return
|
|
}
|
|
weedBinaryPath = binPath
|
|
})
|
|
|
|
if weedBinaryErr != nil {
|
|
return "", weedBinaryErr
|
|
}
|
|
return weedBinaryPath, nil
|
|
}
|
|
|
|
func isExecutableFile(path string) bool {
|
|
info, err := os.Stat(path)
|
|
if err != nil || info.IsDir() {
|
|
return false
|
|
}
|
|
mode := info.Mode().Perm()
|
|
return mode&0o111 != 0
|
|
}
|
|
|
|
func (c *Cluster) tailLog(logName string) string {
|
|
f, err := os.Open(filepath.Join(c.logsDir, logName))
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
defer f.Close()
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
lines := make([]string, 0, 40)
|
|
for scanner.Scan() {
|
|
lines = append(lines, scanner.Text())
|
|
if len(lines) > 40 {
|
|
lines = lines[1:]
|
|
}
|
|
}
|
|
return strings.Join(lines, "\n")
|
|
}
|
|
|
|
func (c *Cluster) MasterAddress() string {
|
|
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort))
|
|
}
|
|
|
|
func (c *Cluster) VolumeAdminAddress() string {
|
|
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePort))
|
|
}
|
|
|
|
func (c *Cluster) VolumePublicAddress() string {
|
|
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPort))
|
|
}
|
|
|
|
func (c *Cluster) VolumeGRPCAddress() string {
|
|
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPort))
|
|
}
|
|
|
|
// VolumeServerAddress returns SeaweedFS server address format: ip:httpPort.grpcPort
|
|
func (c *Cluster) VolumeServerAddress() string {
|
|
return fmt.Sprintf("%s.%d", c.VolumeAdminAddress(), c.volumeGrpcPort)
|
|
}
|
|
|
|
func (c *Cluster) MasterURL() string {
|
|
return "http://" + c.MasterAddress()
|
|
}
|
|
|
|
func (c *Cluster) VolumeAdminURL() string {
|
|
return "http://" + c.VolumeAdminAddress()
|
|
}
|
|
|
|
func (c *Cluster) VolumePublicURL() string {
|
|
return "http://" + c.VolumePublicAddress()
|
|
}
|
|
|
|
func (c *Cluster) BaseDir() string {
|
|
return c.baseDir
|
|
}
|
|
|
|
// VolumeDataDirs returns the data directories the volume server was started with.
|
|
// Index 0 corresponds to DiskLocation 0, index 1 to DiskLocation 1, and so on.
|
|
// Tests can scan these directories to verify where files physically landed.
|
|
func (c *Cluster) VolumeDataDirs() []string {
|
|
return append([]string(nil), c.volumeDataDirs...)
|
|
}
|