Files
seaweedfs/test/fuse_dlm/framework_test.go
Chris Lu e648c76bcf go fmt
2026-04-10 17:31:14 -07:00

517 lines
14 KiB
Go

package fuse_dlm
import (
"context"
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"syscall"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const filerGroup = "fuse-dlm-test"
// dlmTestCluster manages a full SeaweedFS cluster with 2 filers and 2 FUSE
// mounts for testing DLM-based cross-mount write coordination.
type dlmTestCluster struct {
t testing.TB
baseDir string
weedBinary string
masterPort int
masterGrpcPort int
volumePort int
volumeGrpcPort int
filerPorts [2]int
filerGrpcPorts [2]int
mountPoints [2]string
masterCmd *exec.Cmd
volumeCmd *exec.Cmd
filerCmds [2]*exec.Cmd
mountCmds [2]*exec.Cmd
logFiles []*os.File
cleanupOnce sync.Once
}
func startDLMTestCluster(t testing.TB) *dlmTestCluster {
binary := findWeedBinary()
if binary == "" {
t.Skip("weed binary not found; set WEED_BINARY or ensure it is on PATH")
}
baseDir, err := os.MkdirTemp("", "seaweedfs_fuse_dlm_test_")
require.NoError(t, err)
c := &dlmTestCluster{
t: t,
baseDir: baseDir,
weedBinary: binary,
}
// Register cleanup early so processes are stopped even if a require fails below.
t.Cleanup(c.Stop)
// Allocate ports: master(2) + volume(2) + filer0(2) + filer1(2) = 8
ports := allocatePorts(t, 8)
c.masterPort = ports[0]
c.masterGrpcPort = ports[1]
c.volumePort = ports[2]
c.volumeGrpcPort = ports[3]
c.filerPorts[0] = ports[4]
c.filerGrpcPorts[0] = ports[5]
c.filerPorts[1] = ports[6]
c.filerGrpcPorts[1] = ports[7]
// Write empty security.toml
configDir := filepath.Join(baseDir, "config")
require.NoError(t, os.MkdirAll(configDir, 0755))
require.NoError(t, os.WriteFile(filepath.Join(configDir, "security.toml"), []byte(""), 0644))
// Start master
require.NoError(t, c.startMaster(configDir))
require.NoError(t, c.waitForTCP(fmt.Sprintf("127.0.0.1:%d", c.masterPort), 30*time.Second),
"master not ready\n%s", c.tailLog("master"))
// Start volume
require.NoError(t, c.startVolume(configDir))
require.NoError(t, c.waitForTCP(fmt.Sprintf("127.0.0.1:%d", c.volumePort), 30*time.Second),
"volume not ready\n%s", c.tailLog("volume"))
// Start 2 filers
for i := 0; i < 2; i++ {
require.NoError(t, c.startFiler(i, configDir))
require.NoError(t, c.waitForTCP(fmt.Sprintf("127.0.0.1:%d", c.filerGrpcPorts[i]), 30*time.Second),
"filer %d not ready\n%s", i, c.tailLog(fmt.Sprintf("filer%d", i)))
}
require.NoError(t, c.waitForFilerCount(2, 30*time.Second), "filer group registration")
require.NoError(t, c.waitForLockRingConverged(30*time.Second), "lock ring convergence")
// Start 2 mounts, both pointing at filer0 for metadata consistency.
// (filer1 exists for the DLM lock ring but both mounts share filer0's
// metadata store since leveldb is per-filer.)
for i := 0; i < 2; i++ {
mp := filepath.Join(baseDir, fmt.Sprintf("mount%d", i))
require.NoError(t, os.MkdirAll(mp, 0755))
c.mountPoints[i] = mp
require.NoError(t, c.startMount(i, configDir))
require.NoError(t, c.waitForMount(mp, 30*time.Second),
"mount %d not ready\n%s", i, c.tailLog(fmt.Sprintf("mount%d", i)))
}
return c
}
func (c *dlmTestCluster) Stop() {
if c == nil {
return
}
c.cleanupOnce.Do(func() {
// Stop mounts first (triggers flush + DLM unlock).
// Use stopCmd for bounded wait to avoid hanging on wedged FUSE processes.
for i := 1; i >= 0; i-- {
stopCmd(c.mountCmds[i])
// Backup unmount in case FUSE teardown didn't clean up
exec.Command("fusermount3", "-u", c.mountPoints[i]).Run()
exec.Command("fusermount", "-u", c.mountPoints[i]).Run()
}
// Stop filers, volume, master
for i := 1; i >= 0; i-- {
stopCmd(c.filerCmds[i])
}
stopCmd(c.volumeCmd)
stopCmd(c.masterCmd)
for _, f := range c.logFiles {
f.Close()
}
// Copy logs for CI
c.copyLogsForCI()
if !c.t.Failed() {
os.RemoveAll(c.baseDir)
}
// Wait for ports to be fully released before the next test
// allocates new ports (avoids TIME_WAIT collisions).
time.Sleep(2 * time.Second)
})
}
// masterAddress returns the master address in the format that encodes both
// HTTP and gRPC ports: "host:httpPort.grpcPort". This is the format that
// SeaweedFS uses to communicate non-default gRPC ports between components.
func (c *dlmTestCluster) masterAddress() string {
return string(pb.NewServerAddress("127.0.0.1", c.masterPort, c.masterGrpcPort))
}
func (c *dlmTestCluster) startMaster(configDir string) error {
c.masterCmd = exec.Command(c.weedBinary,
"-logdir="+filepath.Join(c.baseDir, "logs"),
"master",
"-ip=127.0.0.1",
"-ip.bind=127.0.0.1",
"-port="+strconv.Itoa(c.masterPort),
"-port.grpc="+strconv.Itoa(c.masterGrpcPort),
"-mdir="+filepath.Join(c.baseDir, "master"),
)
return c.startCmd(c.masterCmd, "master")
}
func (c *dlmTestCluster) startVolume(configDir string) error {
volDir := filepath.Join(c.baseDir, "volume")
if err := os.MkdirAll(volDir, 0755); err != nil {
return fmt.Errorf("create volume dir: %w", err)
}
c.volumeCmd = exec.Command(c.weedBinary,
"-logdir="+filepath.Join(c.baseDir, "logs"),
"volume",
"-ip=127.0.0.1",
"-ip.bind=127.0.0.1",
"-port="+strconv.Itoa(c.volumePort),
"-port.grpc="+strconv.Itoa(c.volumeGrpcPort),
"-master="+c.masterAddress(),
"-dir="+volDir,
"-max=10",
)
return c.startCmd(c.volumeCmd, "volume")
}
func (c *dlmTestCluster) startFiler(idx int, configDir string) error {
filerDir := filepath.Join(c.baseDir, fmt.Sprintf("filer%d", idx))
if err := os.MkdirAll(filerDir, 0755); err != nil {
return fmt.Errorf("create filer dir: %w", err)
}
c.filerCmds[idx] = exec.Command(c.weedBinary,
"-logdir="+filepath.Join(c.baseDir, "logs"),
"filer",
"-ip=127.0.0.1",
"-ip.bind=127.0.0.1",
"-port="+strconv.Itoa(c.filerPorts[idx]),
"-port.grpc="+strconv.Itoa(c.filerGrpcPorts[idx]),
"-master="+c.masterAddress(),
"-filerGroup="+filerGroup,
"-defaultStoreDir="+filerDir,
)
return c.startCmd(c.filerCmds[idx], fmt.Sprintf("filer%d", idx))
}
func (c *dlmTestCluster) filerAddress(idx int) string {
return string(pb.NewServerAddress("127.0.0.1", c.filerPorts[idx], c.filerGrpcPorts[idx]))
}
func (c *dlmTestCluster) startMount(idx int, configDir string) error {
cacheDir := filepath.Join(c.baseDir, fmt.Sprintf("cache%d", idx))
if err := os.MkdirAll(cacheDir, 0755); err != nil {
return fmt.Errorf("create cache dir: %w", err)
}
c.mountCmds[idx] = exec.Command(c.weedBinary,
"-logdir="+filepath.Join(c.baseDir, "logs"),
"mount",
"-filer="+c.filerAddress(0), // both mounts use filer0 for shared metadata
"-dir="+c.mountPoints[idx],
"-filer.path=/",
"-dirAutoCreate",
"-allowOthers=false",
"-cacheDir="+cacheDir,
"-dlm",
)
return c.startCmd(c.mountCmds[idx], fmt.Sprintf("mount%d", idx))
}
func (c *dlmTestCluster) startCmd(cmd *exec.Cmd, name string) error {
logPath := filepath.Join(c.baseDir, "logs")
if err := os.MkdirAll(logPath, 0755); err != nil {
return fmt.Errorf("create log dir: %w", err)
}
logFile, err := os.Create(filepath.Join(logPath, name+".log"))
if err != nil {
return err
}
c.logFiles = append(c.logFiles, logFile)
cmd.Stdout = logFile
cmd.Stderr = logFile
return cmd.Start()
}
func (c *dlmTestCluster) tailLog(name string) string {
data, err := os.ReadFile(filepath.Join(c.baseDir, "logs", name+".log"))
if err != nil {
return fmt.Sprintf("(log %s not available: %v)", name, err)
}
const maxTail = 8192
if len(data) > maxTail {
data = data[len(data)-maxTail:]
}
return string(data)
}
func (c *dlmTestCluster) copyLogsForCI() {
ciLogDir := "/tmp/seaweedfs-fuse-dlm-logs"
os.MkdirAll(ciLogDir, 0755)
logsDir := filepath.Join(c.baseDir, "logs")
entries, err := os.ReadDir(logsDir)
if err != nil {
return
}
for _, e := range entries {
data, err := os.ReadFile(filepath.Join(logsDir, e.Name()))
if err != nil {
continue
}
os.WriteFile(filepath.Join(ciLogDir, e.Name()), data, 0644)
}
}
func (c *dlmTestCluster) waitForTCP(addr string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
conn, err := net.DialTimeout("tcp", addr, time.Second)
if err == nil {
conn.Close()
return nil
}
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("service at %s not ready within timeout", addr)
}
// waitForMount waits for a FUSE filesystem to actually be mounted at
// mountPoint by comparing the device ID of the mount point against its parent.
// A plain directory (pre-created before mount) has the same device as its
// parent; a mounted FUSE filesystem has a different device.
func (c *dlmTestCluster) waitForMount(mountPoint string, timeout time.Duration) error {
parentDir := filepath.Dir(mountPoint)
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
parentStat, err := os.Stat(parentDir)
if err != nil {
time.Sleep(200 * time.Millisecond)
continue
}
mountStat, err := os.Stat(mountPoint)
if err != nil {
time.Sleep(200 * time.Millisecond)
continue
}
parentSys := parentStat.Sys().(*syscall.Stat_t)
mountSys := mountStat.Sys().(*syscall.Stat_t)
if parentSys.Dev != mountSys.Dev {
// Different device = FUSE mounted
return nil
}
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("mount point %s not ready within timeout (FUSE not detected)", mountPoint)
}
func (c *dlmTestCluster) filerGRPCAddress(idx int) string {
return fmt.Sprintf("127.0.0.1:%d", c.filerGrpcPorts[idx])
}
func (c *dlmTestCluster) waitForFilerCount(expected int, timeout time.Duration) error {
addr := fmt.Sprintf("127.0.0.1:%d", c.masterGrpcPort)
conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}
defer conn.Close()
client := master_pb.NewSeaweedClient(conn)
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{
ClientType: "filer",
FilerGroup: filerGroup,
})
cancel()
if err == nil && len(resp.ClusterNodes) >= expected {
return nil
}
time.Sleep(200 * time.Millisecond)
}
return fmt.Errorf("timed out waiting for %d filers in group %q", expected, filerGroup)
}
// waitForLockRingConverged verifies that both filers have a consistent view of
// the lock ring by acquiring the same lock through each filer and checking
// mutual exclusion. Adapted from test/s3/distributed_lock/.
func (c *dlmTestCluster) waitForLockRingConverged(timeout time.Duration) error {
deadline := time.Now().Add(timeout)
owners := []pb.ServerAddress{
pb.ServerAddress(c.filerGRPCAddress(0)),
pb.ServerAddress(c.filerGRPCAddress(1)),
}
ring := lock_manager.NewHashRing(lock_manager.DefaultVnodeCount)
ring.SetServers(owners)
attempt := 0
for time.Now().Before(deadline) {
testKeys := convergenceKeysPerPrimary(ring, owners, attempt)
attempt++
allConverged := true
for _, key := range testKeys {
converged, _ := c.checkLockMutualExclusion(key)
if !converged {
allConverged = false
break
}
}
if allConverged {
return nil
}
time.Sleep(500 * time.Millisecond)
}
return fmt.Errorf("lock ring did not converge")
}
// convergenceKeysPerPrimary generates one test key per primary filer.
func convergenceKeysPerPrimary(ring *lock_manager.HashRing, owners []pb.ServerAddress, attempt int) []string {
found := make(map[pb.ServerAddress]bool)
var keys []string
for i := 0; len(found) < len(owners) && i < 10000; i++ {
key := fmt.Sprintf("convergence-test-%d-%d", attempt, i)
primary, _ := ring.GetPrimaryAndBackup(key)
if !found[primary] {
found[primary] = true
keys = append(keys, key)
}
}
return keys
}
func (c *dlmTestCluster) checkLockMutualExclusion(key string) (bool, error) {
// Try to lock via filer0
conn0, err := grpc.NewClient(c.filerGRPCAddress(0), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return false, err
}
defer conn0.Close()
client0 := filer_pb.NewSeaweedFilerClient(conn0)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp0, err := client0.DistributedLock(ctx, &filer_pb.LockRequest{
Name: key,
SecondsToLock: 5,
Owner: "convergence-test-0",
})
if err != nil {
return false, err
}
if resp0.Error != "" {
return false, fmt.Errorf("lock0: %s", resp0.Error)
}
// Try to lock via filer1 — should fail (already locked)
conn1, err := grpc.NewClient(c.filerGRPCAddress(1), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return false, err
}
defer conn1.Close()
client1 := filer_pb.NewSeaweedFilerClient(conn1)
resp1, err := client1.DistributedLock(ctx, &filer_pb.LockRequest{
Name: key,
SecondsToLock: 5,
Owner: "convergence-test-1",
})
// Unlock via filer0
client0.DistributedUnlock(ctx, &filer_pb.UnlockRequest{
Name: key,
RenewToken: resp0.RenewToken,
})
if err != nil {
return false, err
}
// If filer1 also got the lock, the ring hasn't converged
if resp1.Error == "" {
// Unlock the second one too
client1.DistributedUnlock(ctx, &filer_pb.UnlockRequest{
Name: key,
RenewToken: resp1.RenewToken,
})
return false, nil
}
return true, nil
}
func stopCmd(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil {
return
}
cmd.Process.Signal(syscall.SIGTERM)
done := make(chan struct{})
go func() {
cmd.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(10 * time.Second):
cmd.Process.Kill()
<-done
}
}
func allocatePorts(t testing.TB, n int) []int {
t.Helper()
// Hold all listeners open until all ports are collected, then close
// them together. This prevents the OS from reassigning a just-freed
// port to the next Listen call within the same allocation batch.
listeners := make([]net.Listener, 0, n)
ports := make([]int, 0, n)
defer func() {
for _, l := range listeners {
l.Close()
}
}()
for i := 0; i < n; i++ {
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
listeners = append(listeners, l)
ports = append(ports, l.Addr().(*net.TCPAddr).Port)
}
return ports
}
func findWeedBinary() string {
if p := os.Getenv("WEED_BINARY"); p != "" {
return p
}
if p, err := exec.LookPath("weed"); err == nil {
return p
}
candidates := []string{
"../../weed/weed",
"./weed",
}
for _, c := range candidates {
if info, err := os.Stat(c); err == nil && !info.IsDir() {
abs, _ := filepath.Abs(c)
return abs
}
}
return ""
}