Files
seaweedfs/test/erasure_coding/admin_dockertest/ec_integration_test.go
Chris Lu 6cab199400 fix(iceberg): dial filer gRPC address verbatim in plugin worker (#9527)
* fix(iceberg): dial filer gRPC address verbatim in plugin worker

dialFiler was running its address argument through pb.ServerAddress.ToGrpcAddress,
whose single-port fallback adds +10000 to any host:port — so when the admin
forwards ClusterContext.FilerGrpcAddresses (already host:grpcPort) to the worker,
the iceberg handler turns the real gRPC port (e.g. 18888) into a non-existent
28888 and dispatched jobs fail with connection refused.

Drop the conversion; the address is already dialable. Tests that produced fake
filer addresses in dual-port form now return host:grpcPort to match the new
contract.

* test(ec): use renamed detection_interval_minutes field

The admin_runtime.detection_interval_seconds field was renamed to
detection_interval_minutes back in May. This integration test was not
updated, so the unknown JSON field was silently ignored and the scheduler
fell back to the default detection interval (17 min for erasure_coding),
which exceeds the test's 5-minute wait and times out.

Switch to detection_interval_minutes: 1 — local run completes in ~120s.
2026-05-17 23:03:00 -07:00

504 lines
15 KiB
Go

package admin_dockertest
import (
"bytes"
crand "crypto/rand"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"testing"
"time"
)
const (
AdminUrl = "http://localhost:23646"
MasterUrl = "http://localhost:9333"
FilerUrl = "http://localhost:8888"
)
// Helper to run commands in background and track PIDs for cleanup. Guarded
// by runningCmdsLock so parallel subprocess startup can append safely.
var (
runningCmds []*exec.Cmd
runningCmdsLock sync.Mutex
)
func cleanup() {
runningCmdsLock.Lock()
defer runningCmdsLock.Unlock()
for _, cmd := range runningCmds {
if cmd.Process != nil {
cmd.Process.Kill()
}
}
}
func startWeed(t *testing.T, name string, args ...string) *exec.Cmd {
cmd := exec.Command("./weed_bin", args...)
// Create logs dir in local ./tmp
wd, _ := os.Getwd()
logDir := filepath.Join(wd, "tmp", "logs")
os.MkdirAll(logDir, 0755)
logFile, err := os.Create(filepath.Join(logDir, name+".log"))
if err != nil {
t.Fatalf("Failed to create log file: %v", err)
}
cmd.Stdout = logFile
cmd.Stderr = logFile
// Set Cwd to test directory so it finds local ./tmp
cmd.Dir = wd
// assume "weed_bin" binary is in project root.
rootDir := filepath.Dir(filepath.Dir(filepath.Dir(wd)))
cmd.Path = filepath.Join(rootDir, "weed_bin")
err = cmd.Start()
if err != nil {
t.Fatalf("Failed to start weed %v: %v", args, err)
}
runningCmdsLock.Lock()
runningCmds = append(runningCmds, cmd)
runningCmdsLock.Unlock()
return cmd
}
func stopWeed(t *testing.T, cmd *exec.Cmd) {
if cmd != nil && cmd.Process != nil {
t.Logf("Stopping process %d", cmd.Process.Pid)
cmd.Process.Kill()
cmd.Wait()
// Remove from runningCmds to avoid double kill in cleanup
for i, c := range runningCmds {
if c == cmd {
runningCmds = append(runningCmds[:i], runningCmds[i+1:]...)
break
}
}
}
}
func ensureEnvironment(t *testing.T) {
// 1. Build weed binary
wd, _ := os.Getwd()
rootDir := filepath.Dir(filepath.Dir(filepath.Dir(wd))) // Up 3 levels
buildCmd := exec.Command("go", "build", "-o", "weed_bin", "./weed")
buildCmd.Dir = rootDir
buildCmd.Stdout = os.Stdout
buildCmd.Stderr = os.Stderr
if err := buildCmd.Run(); err != nil {
t.Fatalf("Failed to build weed: %v", err)
}
t.Log("Successfully built weed binary")
// 2. Start Master
// Use local ./tmp/master
os.RemoveAll("tmp")
err := os.MkdirAll(filepath.Join("tmp", "master"), 0755)
if err != nil {
t.Fatalf("Failed to create tmp dir: %v", err)
}
startWeed(t, "master", "master", "-mdir=./tmp/master", "-port=9333", "-ip=localhost", "-peers=none", "-volumeSizeLimitMB=100")
// Wait for master
waitForUrl(t, MasterUrl+"/cluster/status", 10)
// 3. Start Volume Server (Worker)
// Start 14 volume servers to verify RS(10,4) default EC. Fork/exec in
// parallel because startWeed is non-blocking and the per-process fork +
// mkdir + log-file-open overhead stacks up sequentially on cold CI
// disks, eating most of the admin /health wait budget further down.
var volWg sync.WaitGroup
for i := 1; i <= 14; i++ {
volWg.Add(1)
go func(i int) {
defer volWg.Done()
volName := fmt.Sprintf("volume%d", i)
port := 8080 + i - 1
dir := filepath.Join("tmp", volName)
os.MkdirAll(dir, 0755)
startWeed(t, volName, "volume", "-dir="+dir, "-mserver=localhost:9333", fmt.Sprintf("-port=%d", port), "-ip=localhost")
}(i)
}
volWg.Wait()
// 4. Start Filer
os.MkdirAll(filepath.Join("tmp", "filer"), 0755)
startWeed(t, "filer", "filer", "-defaultStoreDir=./tmp/filer", "-master=localhost:9333", "-port=8888", "-ip=localhost")
waitForUrl(t, FilerUrl+"/", 60)
// 5. Start Workers (Maintenance)
// We need workers to execute EC tasks
for i := 1; i <= 2; i++ {
workerName := fmt.Sprintf("worker%d", i)
metricsPort := 9327 + i - 1
debugPort := 6060 + i
dir, _ := filepath.Abs(filepath.Join("tmp", workerName))
os.MkdirAll(dir, 0755)
startWeed(t, workerName, "worker", "-admin=localhost:23646", "-workingDir="+dir, fmt.Sprintf("-metricsPort=%d", metricsPort), fmt.Sprintf("-debug.port=%d", debugPort))
}
// 6. Start Admin
os.RemoveAll(filepath.Join("tmp", "admin"))
os.MkdirAll(filepath.Join("tmp", "admin"), 0755)
startWeed(t, "admin", "admin", "-master=localhost:9333", "-port=23646", "-dataDir=./tmp/admin")
// Admin is started after master, 14 volume servers, filer and 2 workers,
// so under cold CI conditions the wait here has to absorb the tail of
// every earlier subprocess coming up. 60s is too tight and has flaked;
// 180s gives comfortable headroom without meaningfully extending the
// fast path (the first successful /health usually hits well under 30s).
waitForUrl(t, AdminUrl+"/health", 180)
t.Log("Environment started successfully")
}
func waitForUrl(t *testing.T, url string, retries int) {
for i := 0; i < retries; i++ {
resp, err := http.Get(url)
if err == nil && resp.StatusCode == 200 {
resp.Body.Close()
return
}
time.Sleep(1 * time.Second)
}
t.Fatalf("Timeout waiting for %s", url)
}
func fetchJSON(url string, out interface{}) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("status %d: %s", resp.StatusCode, string(body))
}
return json.NewDecoder(resp.Body).Decode(out)
}
func mapField(obj map[string]interface{}, key string) (interface{}, bool) {
if obj == nil {
return nil, false
}
if value, ok := obj[key]; ok {
return value, true
}
return nil, false
}
func mapFieldAny(obj map[string]interface{}, keys ...string) (interface{}, bool) {
for _, key := range keys {
if value, ok := mapField(obj, key); ok {
return value, true
}
}
return nil, false
}
func TestEcEndToEnd(t *testing.T) {
defer cleanup()
ensureEnvironment(t)
client := &http.Client{}
// 1. Configure plugin job types for fast EC detection/execution.
t.Log("Configuring plugin job types via API...")
// Disable volume balance to reduce interference for this EC-focused test.
balanceConfig := map[string]interface{}{
"job_type": "volume_balance",
"admin_runtime": map[string]interface{}{
"enabled": false,
},
}
jsonBody, err := json.Marshal(balanceConfig)
if err != nil {
t.Fatalf("Failed to marshal volume_balance config: %v", err)
}
req, err := http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/volume_balance/config", bytes.NewBuffer(jsonBody))
if err != nil {
t.Fatalf("Failed to create volume_balance config request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Failed to update volume_balance config: %v", err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("Failed to update volume_balance config (status %d): %s", resp.StatusCode, string(body))
}
resp.Body.Close()
ecConfig := map[string]interface{}{
"job_type": "erasure_coding",
"admin_runtime": map[string]interface{}{
"enabled": true,
"detection_interval_minutes": 1,
"global_execution_concurrency": 4,
"per_worker_execution_concurrency": 4,
"max_jobs_per_detection": 100,
},
"worker_config_values": map[string]interface{}{
"quiet_for_seconds": map[string]interface{}{
"int64_value": "1",
},
"min_interval_seconds": map[string]interface{}{
"int64_value": "1",
},
"min_size_mb": map[string]interface{}{
"int64_value": "1",
},
"fullness_ratio": map[string]interface{}{
"double_value": 0.0001,
},
},
}
jsonBody, err = json.Marshal(ecConfig)
if err != nil {
t.Fatalf("Failed to marshal erasure_coding config: %v", err)
}
req, err = http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/erasure_coding/config", bytes.NewBuffer(jsonBody))
if err != nil {
t.Fatalf("Failed to create erasure_coding config request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err = client.Do(req)
if err != nil {
t.Fatalf("Failed to update erasure_coding config: %v", err)
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
t.Fatalf("Failed to update erasure_coding config (status %d): %s", resp.StatusCode, string(body))
}
resp.Body.Close()
// 2. Upload a file
fileSize := 5 * 1024 * 1024
data := make([]byte, fileSize)
crand.Read(data)
fileName := fmt.Sprintf("ec_test_file_%d", time.Now().Unix())
t.Logf("Uploading %d bytes file %s to Filer...", fileSize, fileName)
uploadUrl := FilerUrl + "/" + fileName
var uploadErr error
for i := 0; i < 10; i++ {
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewBuffer(data))
if err != nil {
uploadErr = err
t.Logf("Upload attempt %d failed to create request: %v", i+1, err)
time.Sleep(2 * time.Second)
continue
}
resp, err := client.Do(req)
if err == nil {
if resp.StatusCode == 201 {
resp.Body.Close()
uploadErr = nil
break
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
uploadErr = fmt.Errorf("status %d: %s", resp.StatusCode, string(body))
} else {
uploadErr = err
}
t.Logf("Upload attempt %d failed: %v", i+1, uploadErr)
time.Sleep(2 * time.Second)
}
if uploadErr != nil {
t.Fatalf("Failed to upload file after retries: %v", uploadErr)
}
t.Log("Upload successful")
// 3. Verify EC Encoding
t.Log("Waiting for EC encoding (checking Master topology)...")
startTime := time.Now()
ecVerified := false
var lastBody []byte
debugTick := 0
for time.Since(startTime) < 300*time.Second {
// 3.1 Check Master Topology
resp, err := http.Get(MasterUrl + "/dir/status")
if err == nil {
lastBody, _ = io.ReadAll(resp.Body)
resp.Body.Close()
// Check total EC shards
reShards := regexp.MustCompile(`"EcShards":\s*(\d+)`)
matches := reShards.FindAllSubmatch(lastBody, -1)
totalShards := 0
for _, m := range matches {
var count int
fmt.Sscanf(string(m[1]), "%d", &count)
totalShards += count
}
if totalShards > 0 {
t.Logf("EC encoding verified (found %d total EcShards in topology) after %d seconds", totalShards, int(time.Since(startTime).Seconds()))
ecVerified = true
break
}
}
// 3.2 Debug: Check workers, jobs, and scheduler status
debugTick++
var workers []map[string]interface{}
workerCount := 0
ecDetectorCount := 0
ecExecutorCount := 0
if err := fetchJSON(AdminUrl+"/api/plugin/workers", &workers); err == nil {
workerCount = len(workers)
for _, worker := range workers {
capsValue, ok := mapFieldAny(worker, "capabilities", "Capabilities")
if !ok {
continue
}
caps, ok := capsValue.(map[string]interface{})
if !ok {
continue
}
if capValue, ok := caps["erasure_coding"].(map[string]interface{}); ok {
if capValue["can_detect"] == true {
ecDetectorCount++
}
if capValue["can_execute"] == true {
ecExecutorCount++
}
}
}
}
var tasks []map[string]interface{}
taskCount := 0
ecTaskCount := 0
ecTaskStates := map[string]int{}
if err := fetchJSON(AdminUrl+"/api/plugin/jobs?limit=1000", &tasks); err == nil {
taskCount = len(tasks)
for _, task := range tasks {
jobType, _ := task["job_type"].(string)
state, _ := task["state"].(string)
if jobType == "erasure_coding" {
ecTaskCount++
ecTaskStates[state]++
}
}
}
t.Logf("Waiting for EC... (Workers: %d det=%d exec=%d, Tasks: %d ec=%d, EC States: %+v)",
workerCount, ecDetectorCount, ecExecutorCount, taskCount, ecTaskCount, ecTaskStates)
if debugTick%3 == 0 {
var pluginStatus map[string]interface{}
if err := fetchJSON(AdminUrl+"/api/plugin/status", &pluginStatus); err == nil {
t.Logf("Plugin status: enabled=%v worker_count=%v worker_grpc_port=%v configured=%v",
pluginStatus["enabled"], pluginStatus["worker_count"], pluginStatus["worker_grpc_port"], pluginStatus["configured"])
}
var schedulerStatus map[string]interface{}
if err := fetchJSON(AdminUrl+"/api/plugin/scheduler-status", &schedulerStatus); err == nil {
if schedValue, ok := schedulerStatus["scheduler"].(map[string]interface{}); ok {
t.Logf("Scheduler status: current_job_type=%v phase=%v last_iteration_had_jobs=%v idle_sleep_seconds=%v last_iteration_done_at=%v next_detection_at=%v",
schedValue["current_job_type"], schedValue["current_phase"],
schedValue["last_iteration_had_jobs"], schedValue["idle_sleep_seconds"], schedValue["last_iteration_done_at"], schedValue["next_detection_at"])
} else {
t.Logf("Scheduler status: %v", schedulerStatus)
}
}
var schedulerStates []map[string]interface{}
if err := fetchJSON(AdminUrl+"/api/plugin/scheduler-states", &schedulerStates); err == nil {
for _, state := range schedulerStates {
if state["job_type"] == "erasure_coding" {
t.Logf("EC scheduler state: enabled=%v detection_in_flight=%v detector_available=%v executor_workers=%v next_detection_at=%v last_run_status=%v last_run_started_at=%v last_run_completed_at=%v",
state["enabled"], state["detection_in_flight"], state["detector_available"],
state["executor_worker_count"], state["next_detection_at"], state["last_run_status"],
state["last_run_started_at"], state["last_run_completed_at"])
break
}
}
}
var jobTypes []map[string]interface{}
if err := fetchJSON(AdminUrl+"/api/plugin/job-types", &jobTypes); err == nil {
var names []string
for _, jobType := range jobTypes {
if name, ok := jobType["job_type"].(string); ok && name != "" {
names = append(names, name)
}
}
t.Logf("Plugin job types: %v", names)
}
var activities []map[string]interface{}
if err := fetchJSON(AdminUrl+"/api/plugin/activities?job_type=erasure_coding&limit=5", &activities); err == nil {
for i := len(activities) - 1; i >= 0; i-- {
act := activities[i]
t.Logf("EC activity: stage=%v message=%v occurred_at=%v", act["stage"], act["message"], act["occurred_at"])
}
}
}
time.Sleep(10 * time.Second)
}
if !ecVerified {
dumpLogs(t)
t.Fatalf("Timed out waiting for EC encoding verified in Topology. Last body: %s", string(lastBody))
}
// 6. Verification: Read back the file
t.Log("Reading back file...")
resp, err = http.Get(uploadUrl)
if err != nil {
dumpLogs(t)
t.Fatalf("Failed to read back file: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
dumpLogs(t)
t.Fatalf("Read back failed status: %d", resp.StatusCode)
}
content, _ := io.ReadAll(resp.Body)
if len(content) != fileSize {
dumpLogs(t)
t.Fatalf("Read back size mismatch: got %d, want %d", len(content), fileSize)
}
// Verify byte-wise content equality
if !bytes.Equal(content, data) {
dumpLogs(t)
t.Fatalf("Read back content mismatch: uploaded and downloaded data differ")
}
t.Log("Test PASS: EC encoding and read back successful!")
}
func dumpLogs(t *testing.T) {
wd, _ := os.Getwd()
logDir := filepath.Join(wd, "tmp", "logs")
files, _ := os.ReadDir(logDir)
for _, f := range files {
if strings.HasSuffix(f.Name(), ".log") {
content, _ := os.ReadFile(filepath.Join(logDir, f.Name()))
t.Logf("--- LOG DUMP: %s ---\n%s\n--- END LOG ---", f.Name(), string(content))
}
}
}