Files
seaweedfs/test/volume_server/framework/cluster.go
Chris Lu c4e1885053 fix(ec): honor disk_id in ReceiveFile so EC shards respect admin placement (#9184) (#9185)
* test(volume_server): reproduce #9184 EC ReceiveFile disk-placement bug

The plugin-worker EC task sends shards via ReceiveFile, which picks
Locations[0] as the target directory regardless of the admin planner's
TargetDisk assignment. ReceiveFileInfo has no disk_id field, so there
is no wire channel to honor the plan.

Adds StartSingleVolumeClusterWithDataDirs to the integration framework
so tests can launch a volume server with N data directories. The new
repro asserts the current (buggy) behavior: sending three distinct EC
shards via ReceiveFile leaves all three files in dir[0] and the other
dirs empty. When the fix adds disk_id to ReceiveFileInfo, this
assertion must flip to verify the planned placement is respected.

* fix(ec): honor disk_id in ReceiveFile so EC shards respect admin placement

Before this change, VolumeServer.ReceiveFile for EC shards always
selected the first HDD location (Locations[0]). The plugin-worker EC
task had no way to pass the admin planner's per-shard disk
assignment — ReceiveFileInfo carried no disk_id field — so every
received EC shard piled onto a single disk per destination server.
On multi-disk servers this caused uneven load (one disk absorbing all
EC shard I/O), frequent ENOSPC retries, and a growing EC backlog
under sustained ingest (see issue #9184).

Changes:
- proto: add disk_id to ReceiveFileInfo, mirroring
  VolumeEcShardsCopyRequest.disk_id.
- worker: DistributeEcShards tracks the planner-assigned disk per
  shard; sendShardFileToDestination forwards that disk id. Metadata
  files (ecx/ecj/vif) inherit the disk of the first data shard
  targeting the same node so they land next to the shards.
- server: ReceiveFile honors disk_id when > 0 with bounds
  validation; disk_id=0 (unset) falls back to the same
  auto-selection pattern as VolumeEcShardsCopy (prefer disk that
  already has shards for this volume, then any HDD with free space,
  then any location with free space).

Tests updated:
- TestReceiveFileEcShardHonorsDiskID asserts three shards sent with
  disk_id={1,2,0} land on data dirs 1, 2, and 0 respectively.
- TestReceiveFileEcShardRejectsInvalidDiskID pins the out-of-range
  disk_id rejection path.

* fix(volume-rust): honor disk_id in ReceiveFile for EC shards

Mirror the Go-side change: when disk_id > 0 place the EC shard on the
requested disk; when unset, auto-select with the same preference order
as volume_ec_shards_copy (disk already holding shards, then any HDD,
then any disk).

* fix(volume): compare disk_id as uint32 to avoid 32-bit overflow

On 32-bit Go builds `int(fileInfo.DiskId) >= len(Locations)` can wrap a
high-bit uint32 to a negative int, bypassing the bounds check before the
index operation. Compare in the uint32 domain instead.

* test(ec): fail invalid-disk_id test on transport error

Previously a transport-level error from CloseAndRecv silently passed the
test by returning early, masking any real gRPC failure. Fail loudly so
only the structured ReceiveFileResponse rejection path counts as a pass.

* docs(test): explain why DiskId=0 auto-selects dir 0 in EC placement test

Documents the load-bearing assumption that shards are never mounted in
this test, so loc.FindEcVolume always returns false and auto-select
falls through to the first HDD. Saves future readers from re-deriving
the expected directory for the DiskId=0 case.

* fix(test): preserve baseDir/volume path for single-dir clusters

StartSingleVolumeClusterWithDataDirs started naming the data directory
volume0 even in the dataDirCount=1 case, which broke Scrub tests that
reach into baseDir/volume via CorruptDatFile / CorruptEcShardFile /
CorruptEcxFile. Keep the legacy name for single-dir clusters; only use
the indexed "volumeN" layout when multiple disks are requested.
2026-04-22 10:30:13 -07:00

454 lines
12 KiB
Go

package framework
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/test/testutil"
"github.com/seaweedfs/seaweedfs/test/volume_server/matrix"
)
const (
defaultWaitTimeout = 30 * time.Second
defaultWaitTick = 200 * time.Millisecond
testVolumeSizeLimitMB = 32
)
var (
weedBinaryOnce sync.Once
weedBinaryPath string
weedBinaryErr error
)
// Cluster is a lightweight SeaweedFS master + one volume server test harness.
type Cluster struct {
testingTB testing.TB
profile matrix.Profile
weedBinary string
baseDir string
configDir string
logsDir string
keepLogs bool
masterPort int
masterGrpcPort int
volumePort int
volumeGrpcPort int
volumePubPort int
masterCmd *exec.Cmd
volumeCmd *exec.Cmd
volumeDataDirs []string
cleanupOnce sync.Once
}
// StartSingleVolumeCluster boots one master and one volume server.
func StartSingleVolumeCluster(t testing.TB, profile matrix.Profile) *Cluster {
return StartSingleVolumeClusterWithDataDirs(t, profile, 1)
}
// StartSingleVolumeClusterWithDataDirs boots one master and one volume server
// with dataDirCount separate data directories (passed to -dir as a comma list).
// Each directory becomes its own DiskLocation on the volume server, letting
// tests exercise multi-disk EC placement paths.
func StartSingleVolumeClusterWithDataDirs(t testing.TB, profile matrix.Profile, dataDirCount int) *Cluster {
t.Helper()
if dataDirCount < 1 {
t.Fatalf("dataDirCount must be >= 1, got %d", dataDirCount)
}
weedBinary, err := FindOrBuildWeedBinary()
if err != nil {
t.Fatalf("resolve weed binary: %v", err)
}
baseDir, keepLogs, err := newWorkDir()
if err != nil {
t.Fatalf("create temp test directory: %v", err)
}
configDir := filepath.Join(baseDir, "config")
logsDir := filepath.Join(baseDir, "logs")
masterDataDir := filepath.Join(baseDir, "master")
volumeDataDirs := make([]string, dataDirCount)
// Single-dir layout stays at baseDir/volume so existing fixtures
// (CorruptDatFile etc.) that hardcode that path keep working. Only
// multi-dir clusters get the "volumeN" layout.
for i := 0; i < dataDirCount; i++ {
if dataDirCount == 1 {
volumeDataDirs[i] = filepath.Join(baseDir, "volume")
} else {
volumeDataDirs[i] = filepath.Join(baseDir, fmt.Sprintf("volume%d", i))
}
}
setupDirs := append([]string{configDir, logsDir, masterDataDir}, volumeDataDirs...)
for _, dir := range setupDirs {
if mkErr := os.MkdirAll(dir, 0o755); mkErr != nil {
t.Fatalf("create %s: %v", dir, mkErr)
}
}
if err = writeSecurityConfig(configDir, profile); err != nil {
t.Fatalf("write security config: %v", err)
}
miniPorts, ports, err := testutil.AllocatePortSet(1, 3)
if err != nil {
t.Fatalf("allocate ports: %v", err)
}
masterPort := miniPorts[0]
masterGrpcPort := masterPort + testutil.GrpcPortOffset
c := &Cluster{
testingTB: t,
profile: profile,
weedBinary: weedBinary,
baseDir: baseDir,
configDir: configDir,
logsDir: logsDir,
keepLogs: keepLogs,
masterPort: masterPort,
masterGrpcPort: masterGrpcPort,
volumePort: ports[0],
volumeGrpcPort: ports[1],
volumePubPort: ports[0],
}
if profile.SplitPublicPort {
c.volumePubPort = ports[2]
}
if err = c.startMaster(masterDataDir); err != nil {
c.Stop()
t.Fatalf("start master: %v", err)
}
if err = c.waitForHTTP(c.MasterURL() + "/dir/status"); err != nil {
masterLog := c.tailLog("master.log")
c.Stop()
t.Fatalf("wait for master readiness: %v\nmaster log tail:\n%s", err, masterLog)
}
if err = c.startVolume(volumeDataDirs); err != nil {
masterLog := c.tailLog("master.log")
c.Stop()
t.Fatalf("start volume: %v\nmaster log tail:\n%s", err, masterLog)
}
c.volumeDataDirs = volumeDataDirs
if err = c.waitForHTTP(c.VolumeAdminURL() + "/status"); err != nil {
volumeLog := c.tailLog("volume.log")
c.Stop()
t.Fatalf("wait for volume readiness: %v\nvolume log tail:\n%s", err, volumeLog)
}
if err = c.waitForTCP(c.VolumeGRPCAddress()); err != nil {
volumeLog := c.tailLog("volume.log")
c.Stop()
t.Fatalf("wait for volume grpc readiness: %v\nvolume log tail:\n%s", err, volumeLog)
}
t.Cleanup(func() {
c.Stop()
})
return c
}
// Stop terminates all processes and cleans temporary files.
func (c *Cluster) Stop() {
if c == nil {
return
}
c.cleanupOnce.Do(func() {
stopProcess(c.volumeCmd)
stopProcess(c.masterCmd)
if !c.keepLogs && !c.testingTB.Failed() {
_ = os.RemoveAll(c.baseDir)
} else if c.baseDir != "" {
c.testingTB.Logf("volume server integration logs kept at %s", c.baseDir)
}
})
}
func (c *Cluster) startMaster(dataDir string) error {
logFile, err := os.Create(filepath.Join(c.logsDir, "master.log"))
if err != nil {
return err
}
args := []string{
"-config_dir=" + c.configDir,
"master",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.masterPort),
"-port.grpc=" + strconv.Itoa(c.masterGrpcPort),
"-mdir=" + dataDir,
"-peers=none",
"-volumeSizeLimitMB=" + strconv.Itoa(testVolumeSizeLimitMB),
"-defaultReplication=000",
}
c.masterCmd = exec.Command(c.weedBinary, args...)
c.masterCmd.Dir = c.baseDir
c.masterCmd.Stdout = logFile
c.masterCmd.Stderr = logFile
return c.masterCmd.Start()
}
func (c *Cluster) startVolume(dataDirs []string) error {
logFile, err := os.Create(filepath.Join(c.logsDir, "volume.log"))
if err != nil {
return err
}
maxPerDir := make([]string, len(dataDirs))
for i := range dataDirs {
maxPerDir[i] = "16"
}
args := []string{
"-config_dir=" + c.configDir,
"volume",
"-ip=127.0.0.1",
"-port=" + strconv.Itoa(c.volumePort),
"-port.grpc=" + strconv.Itoa(c.volumeGrpcPort),
"-port.public=" + strconv.Itoa(c.volumePubPort),
"-dir=" + strings.Join(dataDirs, ","),
"-max=" + strings.Join(maxPerDir, ","),
"-master=127.0.0.1:" + strconv.Itoa(c.masterPort),
"-readMode=" + c.profile.ReadMode,
"-concurrentUploadLimitMB=" + strconv.Itoa(c.profile.ConcurrentUploadLimitMB),
"-concurrentDownloadLimitMB=" + strconv.Itoa(c.profile.ConcurrentDownloadLimitMB),
}
if c.profile.InflightUploadTimeout > 0 {
args = append(args, "-inflightUploadDataTimeout="+c.profile.InflightUploadTimeout.String())
}
if c.profile.InflightDownloadTimeout > 0 {
args = append(args, "-inflightDownloadDataTimeout="+c.profile.InflightDownloadTimeout.String())
}
c.volumeCmd = exec.Command(c.weedBinary, args...)
c.volumeCmd.Dir = c.baseDir
c.volumeCmd.Stdout = logFile
c.volumeCmd.Stderr = logFile
return c.volumeCmd.Start()
}
func (c *Cluster) waitForHTTP(url string) error {
client := &http.Client{Timeout: 1 * time.Second}
deadline := time.Now().Add(defaultWaitTimeout)
for time.Now().Before(deadline) {
resp, err := client.Get(url)
if err == nil {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode < 500 {
return nil
}
}
time.Sleep(defaultWaitTick)
}
return fmt.Errorf("timed out waiting for %s", url)
}
func (c *Cluster) waitForTCP(addr string) error {
deadline := time.Now().Add(defaultWaitTimeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, time.Second)
if err == nil {
_ = conn.Close()
return nil
}
time.Sleep(defaultWaitTick)
}
return fmt.Errorf("timed out waiting for tcp %s", addr)
}
func stopProcess(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil {
return
}
_ = cmd.Process.Signal(os.Interrupt)
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
select {
case <-time.After(10 * time.Second):
_ = cmd.Process.Kill()
<-done
case <-done:
}
}
func newWorkDir() (dir string, keepLogs bool, err error) {
keepLogs = os.Getenv("VOLUME_SERVER_IT_KEEP_LOGS") == "1"
dir, err = os.MkdirTemp("", "seaweedfs_volume_server_it_")
return dir, keepLogs, err
}
func writeSecurityConfig(configDir string, profile matrix.Profile) error {
var b strings.Builder
if profile.EnableJWT {
if profile.JWTSigningKey == "" || profile.JWTReadKey == "" {
return errors.New("jwt profile requires both write and read keys")
}
b.WriteString("[jwt.signing]\n")
b.WriteString("key = \"")
b.WriteString(profile.JWTSigningKey)
b.WriteString("\"\n")
b.WriteString("expires_after_seconds = 60\n\n")
b.WriteString("[jwt.signing.read]\n")
b.WriteString("key = \"")
b.WriteString(profile.JWTReadKey)
b.WriteString("\"\n")
b.WriteString("expires_after_seconds = 60\n")
}
if profile.EnableUIAccess {
if b.Len() > 0 {
b.WriteString("\n")
}
b.WriteString("[access]\n")
b.WriteString("ui = true\n")
}
if b.Len() == 0 {
b.WriteString("# optional security config generated for integration tests\n")
}
return os.WriteFile(filepath.Join(configDir, "security.toml"), []byte(b.String()), 0o644)
}
// FindOrBuildWeedBinary returns an executable weed binary, building one when needed.
func FindOrBuildWeedBinary() (string, error) {
if fromEnv := os.Getenv("WEED_BINARY"); fromEnv != "" {
if isExecutableFile(fromEnv) {
return fromEnv, nil
}
return "", fmt.Errorf("WEED_BINARY is set but not executable: %s", fromEnv)
}
weedBinaryOnce.Do(func() {
repoRoot := ""
if _, file, _, ok := runtime.Caller(0); ok {
repoRoot = filepath.Clean(filepath.Join(filepath.Dir(file), "..", "..", ".."))
}
if repoRoot == "" {
weedBinaryErr = errors.New("unable to detect repository root")
return
}
binDir := filepath.Join(os.TempDir(), "seaweedfs_volume_server_it_bin")
if err := os.MkdirAll(binDir, 0o755); err != nil {
weedBinaryErr = fmt.Errorf("create binary directory %s: %w", binDir, err)
return
}
binPath := filepath.Join(binDir, "weed")
cmd := exec.Command("go", "build", "-o", binPath, ".")
cmd.Dir = filepath.Join(repoRoot, "weed")
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
if err := cmd.Run(); err != nil {
weedBinaryErr = fmt.Errorf("build weed binary: %w\n%s", err, out.String())
return
}
if !isExecutableFile(binPath) {
weedBinaryErr = fmt.Errorf("built weed binary is not executable: %s", binPath)
return
}
weedBinaryPath = binPath
})
if weedBinaryErr != nil {
return "", weedBinaryErr
}
return weedBinaryPath, nil
}
func isExecutableFile(path string) bool {
info, err := os.Stat(path)
if err != nil || info.IsDir() {
return false
}
mode := info.Mode().Perm()
return mode&0o111 != 0
}
func (c *Cluster) tailLog(logName string) string {
f, err := os.Open(filepath.Join(c.logsDir, logName))
if err != nil {
return ""
}
defer f.Close()
scanner := bufio.NewScanner(f)
lines := make([]string, 0, 40)
for scanner.Scan() {
lines = append(lines, scanner.Text())
if len(lines) > 40 {
lines = lines[1:]
}
}
return strings.Join(lines, "\n")
}
func (c *Cluster) MasterAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.masterPort))
}
func (c *Cluster) VolumeAdminAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePort))
}
func (c *Cluster) VolumePublicAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumePubPort))
}
func (c *Cluster) VolumeGRPCAddress() string {
return net.JoinHostPort("127.0.0.1", strconv.Itoa(c.volumeGrpcPort))
}
// VolumeServerAddress returns SeaweedFS server address format: ip:httpPort.grpcPort
func (c *Cluster) VolumeServerAddress() string {
return fmt.Sprintf("%s.%d", c.VolumeAdminAddress(), c.volumeGrpcPort)
}
func (c *Cluster) MasterURL() string {
return "http://" + c.MasterAddress()
}
func (c *Cluster) VolumeAdminURL() string {
return "http://" + c.VolumeAdminAddress()
}
func (c *Cluster) VolumePublicURL() string {
return "http://" + c.VolumePublicAddress()
}
func (c *Cluster) BaseDir() string {
return c.baseDir
}
// VolumeDataDirs returns the data directories the volume server was started with.
// Index 0 corresponds to DiskLocation 0, index 1 to DiskLocation 1, and so on.
// Tests can scan these directories to verify where files physically landed.
func (c *Cluster) VolumeDataDirs() []string {
return append([]string(nil), c.volumeDataDirs...)
}