mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-22 01:31:34 +00:00
* fix(iceberg): dial filer gRPC address verbatim in plugin worker dialFiler was running its address argument through pb.ServerAddress.ToGrpcAddress, whose single-port fallback adds +10000 to any host:port — so when the admin forwards ClusterContext.FilerGrpcAddresses (already host:grpcPort) to the worker, the iceberg handler turns the real gRPC port (e.g. 18888) into a non-existent 28888 and dispatched jobs fail with connection refused. Drop the conversion; the address is already dialable. Tests that produced fake filer addresses in dual-port form now return host:grpcPort to match the new contract. * test(ec): use renamed detection_interval_minutes field The admin_runtime.detection_interval_seconds field was renamed to detection_interval_minutes back in May. This integration test was not updated, so the unknown JSON field was silently ignored and the scheduler fell back to the default detection interval (17 min for erasure_coding), which exceeds the test's 5-minute wait and times out. Switch to detection_interval_minutes: 1 — local run completes in ~120s.
504 lines
15 KiB
Go
504 lines
15 KiB
Go
package admin_dockertest
|
|
|
|
import (
|
|
"bytes"
|
|
crand "crypto/rand"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
AdminUrl = "http://localhost:23646"
|
|
MasterUrl = "http://localhost:9333"
|
|
FilerUrl = "http://localhost:8888"
|
|
)
|
|
|
|
// Helper to run commands in background and track PIDs for cleanup. Guarded
|
|
// by runningCmdsLock so parallel subprocess startup can append safely.
|
|
var (
|
|
runningCmds []*exec.Cmd
|
|
runningCmdsLock sync.Mutex
|
|
)
|
|
|
|
func cleanup() {
|
|
runningCmdsLock.Lock()
|
|
defer runningCmdsLock.Unlock()
|
|
for _, cmd := range runningCmds {
|
|
if cmd.Process != nil {
|
|
cmd.Process.Kill()
|
|
}
|
|
}
|
|
}
|
|
|
|
func startWeed(t *testing.T, name string, args ...string) *exec.Cmd {
|
|
cmd := exec.Command("./weed_bin", args...)
|
|
|
|
// Create logs dir in local ./tmp
|
|
wd, _ := os.Getwd()
|
|
logDir := filepath.Join(wd, "tmp", "logs")
|
|
os.MkdirAll(logDir, 0755)
|
|
|
|
logFile, err := os.Create(filepath.Join(logDir, name+".log"))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create log file: %v", err)
|
|
}
|
|
|
|
cmd.Stdout = logFile
|
|
cmd.Stderr = logFile
|
|
// Set Cwd to test directory so it finds local ./tmp
|
|
cmd.Dir = wd
|
|
|
|
// assume "weed_bin" binary is in project root.
|
|
rootDir := filepath.Dir(filepath.Dir(filepath.Dir(wd)))
|
|
cmd.Path = filepath.Join(rootDir, "weed_bin")
|
|
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
t.Fatalf("Failed to start weed %v: %v", args, err)
|
|
}
|
|
runningCmdsLock.Lock()
|
|
runningCmds = append(runningCmds, cmd)
|
|
runningCmdsLock.Unlock()
|
|
return cmd
|
|
}
|
|
|
|
func stopWeed(t *testing.T, cmd *exec.Cmd) {
|
|
if cmd != nil && cmd.Process != nil {
|
|
t.Logf("Stopping process %d", cmd.Process.Pid)
|
|
cmd.Process.Kill()
|
|
cmd.Wait()
|
|
|
|
// Remove from runningCmds to avoid double kill in cleanup
|
|
for i, c := range runningCmds {
|
|
if c == cmd {
|
|
runningCmds = append(runningCmds[:i], runningCmds[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func ensureEnvironment(t *testing.T) {
|
|
// 1. Build weed binary
|
|
wd, _ := os.Getwd()
|
|
rootDir := filepath.Dir(filepath.Dir(filepath.Dir(wd))) // Up 3 levels
|
|
|
|
buildCmd := exec.Command("go", "build", "-o", "weed_bin", "./weed")
|
|
buildCmd.Dir = rootDir
|
|
buildCmd.Stdout = os.Stdout
|
|
buildCmd.Stderr = os.Stderr
|
|
if err := buildCmd.Run(); err != nil {
|
|
t.Fatalf("Failed to build weed: %v", err)
|
|
}
|
|
t.Log("Successfully built weed binary")
|
|
|
|
// 2. Start Master
|
|
// Use local ./tmp/master
|
|
os.RemoveAll("tmp")
|
|
err := os.MkdirAll(filepath.Join("tmp", "master"), 0755)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create tmp dir: %v", err)
|
|
}
|
|
|
|
startWeed(t, "master", "master", "-mdir=./tmp/master", "-port=9333", "-ip=localhost", "-peers=none", "-volumeSizeLimitMB=100")
|
|
|
|
// Wait for master
|
|
waitForUrl(t, MasterUrl+"/cluster/status", 10)
|
|
|
|
// 3. Start Volume Server (Worker)
|
|
// Start 14 volume servers to verify RS(10,4) default EC. Fork/exec in
|
|
// parallel because startWeed is non-blocking and the per-process fork +
|
|
// mkdir + log-file-open overhead stacks up sequentially on cold CI
|
|
// disks, eating most of the admin /health wait budget further down.
|
|
var volWg sync.WaitGroup
|
|
for i := 1; i <= 14; i++ {
|
|
volWg.Add(1)
|
|
go func(i int) {
|
|
defer volWg.Done()
|
|
volName := fmt.Sprintf("volume%d", i)
|
|
port := 8080 + i - 1
|
|
dir := filepath.Join("tmp", volName)
|
|
os.MkdirAll(dir, 0755)
|
|
startWeed(t, volName, "volume", "-dir="+dir, "-mserver=localhost:9333", fmt.Sprintf("-port=%d", port), "-ip=localhost")
|
|
}(i)
|
|
}
|
|
volWg.Wait()
|
|
|
|
// 4. Start Filer
|
|
os.MkdirAll(filepath.Join("tmp", "filer"), 0755)
|
|
startWeed(t, "filer", "filer", "-defaultStoreDir=./tmp/filer", "-master=localhost:9333", "-port=8888", "-ip=localhost")
|
|
waitForUrl(t, FilerUrl+"/", 60)
|
|
|
|
// 5. Start Workers (Maintenance)
|
|
// We need workers to execute EC tasks
|
|
for i := 1; i <= 2; i++ {
|
|
workerName := fmt.Sprintf("worker%d", i)
|
|
metricsPort := 9327 + i - 1
|
|
debugPort := 6060 + i
|
|
dir, _ := filepath.Abs(filepath.Join("tmp", workerName))
|
|
os.MkdirAll(dir, 0755)
|
|
startWeed(t, workerName, "worker", "-admin=localhost:23646", "-workingDir="+dir, fmt.Sprintf("-metricsPort=%d", metricsPort), fmt.Sprintf("-debug.port=%d", debugPort))
|
|
}
|
|
|
|
// 6. Start Admin
|
|
os.RemoveAll(filepath.Join("tmp", "admin"))
|
|
os.MkdirAll(filepath.Join("tmp", "admin"), 0755)
|
|
startWeed(t, "admin", "admin", "-master=localhost:9333", "-port=23646", "-dataDir=./tmp/admin")
|
|
// Admin is started after master, 14 volume servers, filer and 2 workers,
|
|
// so under cold CI conditions the wait here has to absorb the tail of
|
|
// every earlier subprocess coming up. 60s is too tight and has flaked;
|
|
// 180s gives comfortable headroom without meaningfully extending the
|
|
// fast path (the first successful /health usually hits well under 30s).
|
|
waitForUrl(t, AdminUrl+"/health", 180)
|
|
|
|
t.Log("Environment started successfully")
|
|
}
|
|
|
|
func waitForUrl(t *testing.T, url string, retries int) {
|
|
for i := 0; i < retries; i++ {
|
|
resp, err := http.Get(url)
|
|
if err == nil && resp.StatusCode == 200 {
|
|
resp.Body.Close()
|
|
return
|
|
}
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
t.Fatalf("Timeout waiting for %s", url)
|
|
}
|
|
|
|
func fetchJSON(url string, out interface{}) error {
|
|
resp, err := http.Get(url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return fmt.Errorf("status %d: %s", resp.StatusCode, string(body))
|
|
}
|
|
return json.NewDecoder(resp.Body).Decode(out)
|
|
}
|
|
|
|
func mapField(obj map[string]interface{}, key string) (interface{}, bool) {
|
|
if obj == nil {
|
|
return nil, false
|
|
}
|
|
if value, ok := obj[key]; ok {
|
|
return value, true
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func mapFieldAny(obj map[string]interface{}, keys ...string) (interface{}, bool) {
|
|
for _, key := range keys {
|
|
if value, ok := mapField(obj, key); ok {
|
|
return value, true
|
|
}
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func TestEcEndToEnd(t *testing.T) {
|
|
defer cleanup()
|
|
ensureEnvironment(t)
|
|
|
|
client := &http.Client{}
|
|
|
|
// 1. Configure plugin job types for fast EC detection/execution.
|
|
t.Log("Configuring plugin job types via API...")
|
|
|
|
// Disable volume balance to reduce interference for this EC-focused test.
|
|
balanceConfig := map[string]interface{}{
|
|
"job_type": "volume_balance",
|
|
"admin_runtime": map[string]interface{}{
|
|
"enabled": false,
|
|
},
|
|
}
|
|
jsonBody, err := json.Marshal(balanceConfig)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal volume_balance config: %v", err)
|
|
}
|
|
req, err := http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/volume_balance/config", bytes.NewBuffer(jsonBody))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create volume_balance config request: %v", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
t.Fatalf("Failed to update volume_balance config: %v", err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
t.Fatalf("Failed to update volume_balance config (status %d): %s", resp.StatusCode, string(body))
|
|
}
|
|
resp.Body.Close()
|
|
|
|
ecConfig := map[string]interface{}{
|
|
"job_type": "erasure_coding",
|
|
"admin_runtime": map[string]interface{}{
|
|
"enabled": true,
|
|
"detection_interval_minutes": 1,
|
|
"global_execution_concurrency": 4,
|
|
"per_worker_execution_concurrency": 4,
|
|
"max_jobs_per_detection": 100,
|
|
},
|
|
"worker_config_values": map[string]interface{}{
|
|
"quiet_for_seconds": map[string]interface{}{
|
|
"int64_value": "1",
|
|
},
|
|
"min_interval_seconds": map[string]interface{}{
|
|
"int64_value": "1",
|
|
},
|
|
"min_size_mb": map[string]interface{}{
|
|
"int64_value": "1",
|
|
},
|
|
"fullness_ratio": map[string]interface{}{
|
|
"double_value": 0.0001,
|
|
},
|
|
},
|
|
}
|
|
jsonBody, err = json.Marshal(ecConfig)
|
|
if err != nil {
|
|
t.Fatalf("Failed to marshal erasure_coding config: %v", err)
|
|
}
|
|
req, err = http.NewRequest("PUT", AdminUrl+"/api/plugin/job-types/erasure_coding/config", bytes.NewBuffer(jsonBody))
|
|
if err != nil {
|
|
t.Fatalf("Failed to create erasure_coding config request: %v", err)
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
resp, err = client.Do(req)
|
|
if err != nil {
|
|
t.Fatalf("Failed to update erasure_coding config: %v", err)
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
body, _ := io.ReadAll(resp.Body)
|
|
t.Fatalf("Failed to update erasure_coding config (status %d): %s", resp.StatusCode, string(body))
|
|
}
|
|
resp.Body.Close()
|
|
|
|
// 2. Upload a file
|
|
fileSize := 5 * 1024 * 1024
|
|
data := make([]byte, fileSize)
|
|
crand.Read(data)
|
|
fileName := fmt.Sprintf("ec_test_file_%d", time.Now().Unix())
|
|
t.Logf("Uploading %d bytes file %s to Filer...", fileSize, fileName)
|
|
uploadUrl := FilerUrl + "/" + fileName
|
|
|
|
var uploadErr error
|
|
for i := 0; i < 10; i++ {
|
|
req, err := http.NewRequest("PUT", uploadUrl, bytes.NewBuffer(data))
|
|
if err != nil {
|
|
uploadErr = err
|
|
t.Logf("Upload attempt %d failed to create request: %v", i+1, err)
|
|
time.Sleep(2 * time.Second)
|
|
continue
|
|
}
|
|
resp, err := client.Do(req)
|
|
if err == nil {
|
|
if resp.StatusCode == 201 {
|
|
resp.Body.Close()
|
|
uploadErr = nil
|
|
break
|
|
}
|
|
body, _ := io.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
uploadErr = fmt.Errorf("status %d: %s", resp.StatusCode, string(body))
|
|
} else {
|
|
uploadErr = err
|
|
}
|
|
t.Logf("Upload attempt %d failed: %v", i+1, uploadErr)
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
|
|
if uploadErr != nil {
|
|
t.Fatalf("Failed to upload file after retries: %v", uploadErr)
|
|
}
|
|
t.Log("Upload successful")
|
|
|
|
// 3. Verify EC Encoding
|
|
t.Log("Waiting for EC encoding (checking Master topology)...")
|
|
startTime := time.Now()
|
|
ecVerified := false
|
|
var lastBody []byte
|
|
debugTick := 0
|
|
|
|
for time.Since(startTime) < 300*time.Second {
|
|
// 3.1 Check Master Topology
|
|
resp, err := http.Get(MasterUrl + "/dir/status")
|
|
if err == nil {
|
|
lastBody, _ = io.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
|
|
// Check total EC shards
|
|
reShards := regexp.MustCompile(`"EcShards":\s*(\d+)`)
|
|
matches := reShards.FindAllSubmatch(lastBody, -1)
|
|
totalShards := 0
|
|
for _, m := range matches {
|
|
var count int
|
|
fmt.Sscanf(string(m[1]), "%d", &count)
|
|
totalShards += count
|
|
}
|
|
|
|
if totalShards > 0 {
|
|
t.Logf("EC encoding verified (found %d total EcShards in topology) after %d seconds", totalShards, int(time.Since(startTime).Seconds()))
|
|
ecVerified = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// 3.2 Debug: Check workers, jobs, and scheduler status
|
|
debugTick++
|
|
|
|
var workers []map[string]interface{}
|
|
workerCount := 0
|
|
ecDetectorCount := 0
|
|
ecExecutorCount := 0
|
|
if err := fetchJSON(AdminUrl+"/api/plugin/workers", &workers); err == nil {
|
|
workerCount = len(workers)
|
|
for _, worker := range workers {
|
|
capsValue, ok := mapFieldAny(worker, "capabilities", "Capabilities")
|
|
if !ok {
|
|
continue
|
|
}
|
|
caps, ok := capsValue.(map[string]interface{})
|
|
if !ok {
|
|
continue
|
|
}
|
|
if capValue, ok := caps["erasure_coding"].(map[string]interface{}); ok {
|
|
if capValue["can_detect"] == true {
|
|
ecDetectorCount++
|
|
}
|
|
if capValue["can_execute"] == true {
|
|
ecExecutorCount++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var tasks []map[string]interface{}
|
|
taskCount := 0
|
|
ecTaskCount := 0
|
|
ecTaskStates := map[string]int{}
|
|
if err := fetchJSON(AdminUrl+"/api/plugin/jobs?limit=1000", &tasks); err == nil {
|
|
taskCount = len(tasks)
|
|
for _, task := range tasks {
|
|
jobType, _ := task["job_type"].(string)
|
|
state, _ := task["state"].(string)
|
|
if jobType == "erasure_coding" {
|
|
ecTaskCount++
|
|
ecTaskStates[state]++
|
|
}
|
|
}
|
|
}
|
|
|
|
t.Logf("Waiting for EC... (Workers: %d det=%d exec=%d, Tasks: %d ec=%d, EC States: %+v)",
|
|
workerCount, ecDetectorCount, ecExecutorCount, taskCount, ecTaskCount, ecTaskStates)
|
|
|
|
if debugTick%3 == 0 {
|
|
var pluginStatus map[string]interface{}
|
|
if err := fetchJSON(AdminUrl+"/api/plugin/status", &pluginStatus); err == nil {
|
|
t.Logf("Plugin status: enabled=%v worker_count=%v worker_grpc_port=%v configured=%v",
|
|
pluginStatus["enabled"], pluginStatus["worker_count"], pluginStatus["worker_grpc_port"], pluginStatus["configured"])
|
|
}
|
|
|
|
var schedulerStatus map[string]interface{}
|
|
if err := fetchJSON(AdminUrl+"/api/plugin/scheduler-status", &schedulerStatus); err == nil {
|
|
if schedValue, ok := schedulerStatus["scheduler"].(map[string]interface{}); ok {
|
|
t.Logf("Scheduler status: current_job_type=%v phase=%v last_iteration_had_jobs=%v idle_sleep_seconds=%v last_iteration_done_at=%v next_detection_at=%v",
|
|
schedValue["current_job_type"], schedValue["current_phase"],
|
|
schedValue["last_iteration_had_jobs"], schedValue["idle_sleep_seconds"], schedValue["last_iteration_done_at"], schedValue["next_detection_at"])
|
|
} else {
|
|
t.Logf("Scheduler status: %v", schedulerStatus)
|
|
}
|
|
}
|
|
|
|
var schedulerStates []map[string]interface{}
|
|
if err := fetchJSON(AdminUrl+"/api/plugin/scheduler-states", &schedulerStates); err == nil {
|
|
for _, state := range schedulerStates {
|
|
if state["job_type"] == "erasure_coding" {
|
|
t.Logf("EC scheduler state: enabled=%v detection_in_flight=%v detector_available=%v executor_workers=%v next_detection_at=%v last_run_status=%v last_run_started_at=%v last_run_completed_at=%v",
|
|
state["enabled"], state["detection_in_flight"], state["detector_available"],
|
|
state["executor_worker_count"], state["next_detection_at"], state["last_run_status"],
|
|
state["last_run_started_at"], state["last_run_completed_at"])
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
var jobTypes []map[string]interface{}
|
|
if err := fetchJSON(AdminUrl+"/api/plugin/job-types", &jobTypes); err == nil {
|
|
var names []string
|
|
for _, jobType := range jobTypes {
|
|
if name, ok := jobType["job_type"].(string); ok && name != "" {
|
|
names = append(names, name)
|
|
}
|
|
}
|
|
t.Logf("Plugin job types: %v", names)
|
|
}
|
|
|
|
var activities []map[string]interface{}
|
|
if err := fetchJSON(AdminUrl+"/api/plugin/activities?job_type=erasure_coding&limit=5", &activities); err == nil {
|
|
for i := len(activities) - 1; i >= 0; i-- {
|
|
act := activities[i]
|
|
t.Logf("EC activity: stage=%v message=%v occurred_at=%v", act["stage"], act["message"], act["occurred_at"])
|
|
}
|
|
}
|
|
}
|
|
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
|
|
if !ecVerified {
|
|
dumpLogs(t)
|
|
t.Fatalf("Timed out waiting for EC encoding verified in Topology. Last body: %s", string(lastBody))
|
|
}
|
|
|
|
// 6. Verification: Read back the file
|
|
t.Log("Reading back file...")
|
|
resp, err = http.Get(uploadUrl)
|
|
if err != nil {
|
|
dumpLogs(t)
|
|
t.Fatalf("Failed to read back file: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != 200 {
|
|
dumpLogs(t)
|
|
t.Fatalf("Read back failed status: %d", resp.StatusCode)
|
|
}
|
|
content, _ := io.ReadAll(resp.Body)
|
|
if len(content) != fileSize {
|
|
dumpLogs(t)
|
|
t.Fatalf("Read back size mismatch: got %d, want %d", len(content), fileSize)
|
|
}
|
|
|
|
// Verify byte-wise content equality
|
|
if !bytes.Equal(content, data) {
|
|
dumpLogs(t)
|
|
t.Fatalf("Read back content mismatch: uploaded and downloaded data differ")
|
|
}
|
|
|
|
t.Log("Test PASS: EC encoding and read back successful!")
|
|
}
|
|
|
|
func dumpLogs(t *testing.T) {
|
|
wd, _ := os.Getwd()
|
|
logDir := filepath.Join(wd, "tmp", "logs")
|
|
files, _ := os.ReadDir(logDir)
|
|
for _, f := range files {
|
|
if strings.HasSuffix(f.Name(), ".log") {
|
|
content, _ := os.ReadFile(filepath.Join(logDir, f.Name()))
|
|
t.Logf("--- LOG DUMP: %s ---\n%s\n--- END LOG ---", f.Name(), string(content))
|
|
}
|
|
}
|
|
}
|