Files
Chris Lu d5c0a7b153 fix(ec): make multi-disk same-server EC reads work + full-lifecycle integration test (#9487)
* fix(master): include GrpcPort in LookupEcVolume response

LookupVolume already passes loc.GrpcPort through to the client; LookupEcVolume
builds Location with only Url / PublicUrl / DataCenter, so callers fall back to
ServerToGrpcAddress (httpPort + 10000). On any deployment where that
convention does not hold — multi-disk integration tests, custom port layouts
— EC reads dial the wrong port and quietly degrade to parity recovery.

* fix(volume/ec): probe every DiskLocation when serving local shard reads

reconcileEcShardsAcrossDisks (issue 9212) registers each .ec?? against the
DiskLocation that physically owns it, so a multi-disk volume server can hold
shards for the same vid in two separate ecVolumes — one per disk — with .ecx
on whichever disk owned the original .dat. The read path only consulted the
single EcVolume FindEcVolume picked, so requests for shards on the sibling
disk fell through to errShardNotLocal and then to remote/loopback recovery.

Walk all DiskLocations after the first probe in both readLocalEcShardInterval
and the VolumeEcShardRead gRPC handler; the latter also covers the loopback
that recoverOneRemoteEcShardInterval falls back to when a peer dial fails.

* test(volume/ec): cover the multi-disk EC lifecycle end-to-end

Two integration tests against a real volume server with two data dirs:

TestEcLifecycleAcrossMultipleDisks drives encode -> mount -> HTTP read ->
drop .dat -> stop -> redistribute shards across disks -> restart -> verify
reconcileEcShardsAcrossDisks attached the orphan shards and reads still
work -> blob delete -> stop -> drop a shard -> restart -> VolumeEcShardsRebuild
pulls input from both disks -> reads still work.

TestEcPartialShardsOnSiblingDiskCleanedUpOnRestart is the issue 9478
reproducer at the cluster level: seed a healthy .dat on disk 0, plant the
on-disk footprint of an interrupted EC encode on disk 1, restart, and assert
pruneIncompleteEcWithSiblingDat wipes disk 1 without touching disk 0.

Framework gets RestartVolumeServer / StopVolumeServer helpers; the previous
run's volume.log is rotated to volume.log.previous so a startup regression on
the second run does not lose the first run's diagnostics.

* review: trim verbose comments

* review: drop racy fast-path, use locked findEcShard directly

gemini-code-assist flagged the two-step lookup in readLocalEcShardInterval
and VolumeEcShardRead: the first probe (ecVolume.FindEcVolumeShard) reads
the EcVolume's Shards slice without holding ecVolumesLock, so a concurrent
mount / unmount could race with it. findEcShard already walks every
DiskLocation under the right lock, so the fast-path adds nothing but the
race. Collapse both call sites to a single locked call.

Also note in RestartVolumeServer why the log-rotation error is swallowed:
absence on first call is benign; anything else surfaces in the next
os.Create in startVolume.
2026-05-13 13:56:20 -07:00

490 lines
14 KiB
Go

package framework
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/testutil"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
)
const (
defaultWaitTimeout = 30 * time.Second
defaultWaitTick = 200 * time.Millisecond
testVolumeSizeLimitMB = 32
)
var (
weedBinaryOnce sync.Once
weedBinaryPath string
weedBinaryErr error
)
// Cluster is a lightweight SeaweedFS master + one volume server test harness.
type Cluster struct {
testingTB testing.TB
profile matrix.Profile
weedBinary string
baseDir string
configDir string
logsDir string
keepLogs bool
masterPort int
masterGrpcPort int
volumePort int
volumeGrpcPort int
volumePubPort int
masterCmd *exec.Cmd
volumeCmd *exec.Cmd
volumeDataDirs []string
cleanupOnce sync.Once
}
// StartSingleVolumeCluster boots one master and one volume server.
func StartSingleVolumeCluster(t testing.TB, profile matrix.Profile) *Cluster {
return StartSingleVolumeClusterWithDataDirs(t, profile, 1)
}
// StartSingleVolumeClusterWithDataDirs boots one master and one volume server
// with dataDirCount separate data directories (passed to -dir as a comma list).
// Each directory becomes its own DiskLocation on the volume server, letting
// tests exercise multi-disk EC placement paths.
func StartSingleVolumeClusterWithDataDirs(t testing.TB, profile matrix.Profile, dataDirCount int) *Cluster {
t.Helper()
if dataDirCount < 1 {
t.Fatalf("dataDirCount must be >= 1, got %d", dataDirCount)
}
weedBinary, err := FindOrBuildWeedBinary()
if err != nil {
t.Fatalf("resolve weed binary: %v", err)
}
baseDir, keepLogs, err := newWorkDir()
if err != nil {
t.Fatalf("create temp test directory: %v", err)
}
configDir := filepath.Join(baseDir, "config")
logsDir := filepath.Join(baseDir, "logs")
masterDataDir := filepath.Join(baseDir, "master")
volumeDataDirs := make([]string, dataDirCount)
// Single-dir layout stays at baseDir/volume so existing fixtures
// (CorruptDatFile etc.) that hardcode that path keep working. Only
// multi-dir clusters get the "volumeN" layout.
for i := 0; i < dataDirCount; i++ {
if dataDirCount == 1 {
volumeDataDirs[i] = filepath.Join(baseDir, "volume")
} else {
volumeDataDirs[i] = filepath.Join(baseDir, fmt.Sprintf("volume%d", i))
}
}
setupDirs := append([]string{configDir, logsDir, masterDataDir}, volumeDataDirs...)
for _, dir := range setupDirs {
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
t.Fatalf("create %s: %v", dir, mkErr)
}
}
if err = writeSecurityConfig(configDir, profile); err != nil {
t.Fatalf("write security config: %v", err)
}
miniPorts, ports, err := testutil.AllocatePortSet(1, 3)
if err != nil {
t.Fatalf("allocate ports: %v", err)
}
masterPort := miniPorts[0]
masterGrpcPort := masterPort + testutil.GrpcPortOffset
c := &Cluster{
testingTB: t,
profile: profile,
weedBinary: weedBinary,
baseDir: baseDir,
configDir: configDir,
logsDir: logsDir,
keepLogs: keepLogs,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
volumePort: ports[0],
volumeGrpcPort: ports[1],
volumePubPort: ports[0],
}
if profile.SplitPublicPort {
c.volumePubPort = ports[2]
}
if err = c.startMaster(masterDataDir); err != nil {
c.Stop()
t.Fatalf("start master: %v", err)
}
if err = c.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil {
masterLog := c.tailLog("master.log")
c.Stop()
t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog)
}
if err = c.startVolume(volumeDataDirs); err != nil {
masterLog := c.tailLog("master.log")
c.Stop()
t.Fatalf("start volume: %v\nmaster log tail:\n%s", err, masterLog)
}
c.volumeDataDirs = volumeDataDirs
if err = c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil {
volumeLog := c.tailLog("volume.log")
c.Stop()
t.Fatalf("wait for volume readiness: %v\nvolume log tail:\n%s", err, volumeLog)
}
if err = c.waitForTCP(c.VolumeGRPCAddress()); err != nil {
volumeLog := c.tailLog("volume.log")
c.Stop()
t.Fatalf("wait for volume grpc readiness: %v\nvolume log tail:\n%s", err, volumeLog)
}
t.Cleanup(func() {
c.Stop()
})
return c
}
// Stop terminates all processes and cleans temporary files.
func (c *Cluster) Stop() {
if c == nil {
return
}
c.cleanupOnce.Do(func() {
stopProcess(c.volumeCmd)
stopProcess(c.masterCmd)
if !c.keepLogs && !c.testingTB.Failed() {
_ = os.RemoveAll(c.baseDir)
} else if c.baseDir != "" {
c.testingTB.Logf("volume server integration logs kept at %s", c.baseDir)
}
})
}
// RestartVolumeServer kills the volume server and starts a new one against
// the same data dirs and ports. The master keeps running. The previous run's
// volume.log is moved to volume.log.previous so the first run's logs survive
// a second-run startup failure.
func (c *Cluster) RestartVolumeServer() {
c.testingTB.Helper()
stopProcess(c.volumeCmd)
c.volumeCmd = nil
// Rotate the log; absent on the first call after a clean start, which
// is fine — startVolume will create it. Any real filesystem failure
// surfaces immediately on the next os.Create in startVolume.
oldLog := filepath.Join(c.logsDir, "volume.log")
_ = os.Rename(oldLog, filepath.Join(c.logsDir, "volume.log.previous"))
if err := c.startVolume(c.volumeDataDirs); err != nil {
c.testingTB.Fatalf("restart volume server: %v", err)
}
if err := c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil {
c.testingTB.Fatalf("wait for volume admin readiness after restart: %v\nvolume log tail:\n%s", err, c.tailLog("volume.log"))
}
if err := c.waitForTCP(c.VolumeGRPCAddress()); err != nil {
c.testingTB.Fatalf("wait for volume grpc readiness after restart: %v\nvolume log tail:\n%s", err, c.tailLog("volume.log"))
}
}
// StopVolumeServer kills the volume server but leaves the master and data
// dirs alone. Pair with RestartVolumeServer or Stop.
func (c *Cluster) StopVolumeServer() {
c.testingTB.Helper()
stopProcess(c.volumeCmd)
c.volumeCmd = nil
}
func (c *Cluster) startMaster(dataDir string) error {
logFile, err := os.Create(filepath.Join(c.logsDir, "master.log"))
if err != nil {
return err
}
args := []string{
"-config_dir=" + c.configDir,
"master",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.masterPort),
"-port.grpc=" + strconv.Itoa(c.masterGrpcPort),
"-mdir=" + dataDir,
"-peers=none",
"-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB),
"-defaultReplication=000",
}
c.masterCmd = exec.Command(c.weedBinary, args...)
c.masterCmd.Dir = c.baseDir
c.masterCmd.Stdout = logFile
c.masterCmd.Stderr = logFile
return c.masterCmd.Start()
}
func (c *Cluster) startVolume(dataDirs []string) error {
logFile, err := os.Create(filepath.Join(c.logsDir, "volume.log"))
if err != nil {
return err
}
maxPerDir := make([]string, len(dataDirs))
for i := range dataDirs {
maxPerDir[i] = "16"
}
args := []string{
"-config_dir=" + c.configDir,
"volume",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.volumePort),
"-port.grpc=" + strconv.Itoa(c.volumeGrpcPort),
"-port.public=" + strconv.Itoa(c.volumePubPort),
"-dir=" + strings.Join(dataDirs, ","),
"-max=" + strings.Join(maxPerDir, ","),
"-master=127.0.0.1:" + strconv.Itoa(c.masterPort),
"-readMode=" + c.profile.ReadMode,
"-concurrentUploadLimitMB=" + strconv.Itoa(c.profile.ConcurrentUploadLimitMB),
"-concurrentDownloadLimitMB=" + strconv.Itoa(c.profile.ConcurrentDownloadLimitMB),
// Integration tests deliberately exercise loopback S3 endpoints
// (the test rig boots weed-mini next to the volume server); allow
// the SSRF guard to be bypassed for them.
"-volume.allowUntrustedRemoteEndpoints",
}
if c.profile.InflightUploadTimeout > 0 {
args = append(args, "-inflightUploadDataTimeout="+c.profile.InflightUploadTimeout.String())
}
if c.profile.InflightDownloadTimeout > 0 {
args = append(args, "-inflightDownloadDataTimeout="+c.profile.InflightDownloadTimeout.String())
}
c.volumeCmd = exec.Command(c.weedBinary, args...)
c.volumeCmd.Dir = c.baseDir
c.volumeCmd.Stdout = logFile
c.volumeCmd.Stderr = logFile
return c.volumeCmd.Start()
}
func (c *Cluster) waitForHTTP(url string) error {
client := &http.Client{Timeout: 1 * time.Second}
deadline := time.Now().Add(defaultWaitTimeout)
for time.Now().Before(deadline) {
resp, err := client.Get(url)
if err == nil {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode < 500 {
return nil
}
}
time.Sleep(defaultWaitTick)
}
return fmt.Errorf("timed out waiting for %s", url)
}
func (c *Cluster) waitForTCP(addr string) error {
deadline := time.Now().Add(defaultWaitTimeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, time.Second)
if err == nil {
_ = conn.Close()
return nil
}
time.Sleep(defaultWaitTick)
}
return fmt.Errorf("timed out waiting for tcp %s", addr)
}
func stopProcess(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil {
return
}
_ = cmd.Process.Signal(os.Interrupt)
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-time.After(10 * time.Second):
_ = cmd.Process.Kill()
<-done
case <-done:
}
}
func newWorkDir() (dir string, keepLogs bool, err error) {
keepLogs = os.Getenv("VOLUME_SERVER_IT_KEEP_LOGS") == "1"
dir, err = os.MkdirTemp("", "seaweedfs_volume_server_it_")
return dir, keepLogs, err
}
func writeSecurityConfig(configDir string, profile matrix.Profile) error {
var b strings.Builder
if profile.EnableJWT {
if profile.JWTSigningKey == "" || profile.JWTReadKey == "" {
return errors.New("jwt profile requires both write and read keys")
}
b.WriteString("[jwt.signing]\n")
b.WriteString("key = \"")
b.WriteString(profile.JWTSigningKey)
b.WriteString("\"\n")
b.WriteString("expires_after_seconds = 60\n\n")
b.WriteString("[jwt.signing.read]\n")
b.WriteString("key = \"")
b.WriteString(profile.JWTReadKey)
b.WriteString("\"\n")
b.WriteString("expires_after_seconds = 60\n")
}
if profile.EnableUIAccess {
if b.Len() > 0 {
b.WriteString("\n")
}
b.WriteString("[access]\n")
b.WriteString("ui = true\n")
}
if b.Len() == 0 {
b.WriteString("# optional security config generated for integration tests\n")
}
return os.WriteFile(filepath.Join(configDir, "security.toml"), []byte(b.String()), 0o644)
}
// FindOrBuildWeedBinary returns an executable weed binary, building one when needed.
func FindOrBuildWeedBinary() (string, error) {
if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" {
if isExecutableFile(fromEnv) {
return fromEnv, nil
}
return "", fmt.Errorf("WEED_BINARY is set but not executable: %s", fromEnv)
}
weedBinaryOnce.Do(func() {
repoRoot := ""
if _, file, _, ok := runtime.Caller(0); ok {
repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..", ".."))
}
if repoRoot == "" {
weedBinaryErr = errors.New("unable to detect repository root")
return
}
binDir := filepath.Join(os.TempDir(), "seaweedfs_volume_server_it_bin")
if err := os.MkdirAll(binDir, 0o755); err != nil {
weedBinaryErr = fmt.Errorf("create binary directory %s: %w", binDir, err)
return
}
binPath := filepath.Join(binDir, "weed")
cmd := exec.Command("go", "build", "-o", binPath, ".")
cmd.Dir = filepath.Join(repoRoot, "weed")
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
if err := cmd.Run(); err != nil {
weedBinaryErr = fmt.Errorf("build weed binary: %w\n%s", err, out.String())
return
}
if !isExecutableFile(binPath) {
weedBinaryErr = fmt.Errorf("built weed binary is not executable: %s", binPath)
return
}
weedBinaryPath = binPath
})
if weedBinaryErr != nil {
return "", weedBinaryErr
}
return weedBinaryPath, nil
}
func isExecutableFile(path string) bool {
info, err := os.Stat(path)
if err != nil || info.IsDir() {
return false
}
mode := info.Mode().Perm()
return mode&0o111 != 0
}
func (c *Cluster) tailLog(logName string) string {
f, err := os.Open(filepath.Join(c.logsDir, logName))
if err != nil {
return ""
}
defer f.Close()
scanner := bufio.NewScanner(f)
lines := make([]string, 0, 40)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) > 40 {
lines = lines[1:]
}
}
return strings.Join(lines, "\n")
}
func (c *Cluster) MasterAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort))
}
func (c *Cluster) VolumeAdminAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePort))
}
func (c *Cluster) VolumePublicAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPort))
}
func (c *Cluster) VolumeGRPCAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPort))
}
// VolumeServerAddress returns SeaweedFS server address format: ip:httpPort.grpcPort
func (c *Cluster) VolumeServerAddress() string {
return fmt.Sprintf("%s.%d", c.VolumeAdminAddress(), c.volumeGrpcPort)
}
func (c *Cluster) MasterURL() string {
return "http://" + c.MasterAddress()
}
func (c *Cluster) VolumeAdminURL() string {
return "http://" + c.VolumeAdminAddress()
}
func (c *Cluster) VolumePublicURL() string {
return "http://" + c.VolumePublicAddress()
}
func (c *Cluster) BaseDir() string {
return c.baseDir
}
// VolumeDataDirs returns the data directories the volume server was started with.
// Index 0 corresponds to DiskLocation 0, index 1 to DiskLocation 1, and so on.
// Tests can scan these directories to verify where files physically landed.
func (c *Cluster) VolumeDataDirs() []string {
return append([]string(nil), c.volumeDataDirs...)
}