mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
fix(test): stabilize ConcurrentLockContention; warn on coherence drift
TestPosixFileLocking/ConcurrentLockContention failed in CI (run 24857323067) with ENOENT when re-opening the file after all 8 workers had successfully written and closed. The 20s openWithRetry budget was exhausted, pointing at a real but unproven metaCache/parent-cache coherence issue in the mount under bursts of concurrent Release. Test: hold the initial fd open for the whole subtest; use it for the post-workers Sync() and the verification read. Workers still exercise the concurrent-flock invariant and per-record write correctness; the re-open path is no longer load-bearing. On Eventually failure, dump ReadDir of the parent, Stat, and a fresh O_RDONLY open so a future recurrence has state to debug from. Drop the darwin-only ENOENT t.Skip branches that hid this same flake. Mount: in weedfs.lookupEntry, when returning ENOENT from the "parent cached but child missing" branch, log at Warningf instead of V(4) when the kernel is still tracking this path's inode. That combination is the smoking-gun signal for cache drift and is rare enough in normal use not to spam the log.
This commit is contained in:
@@ -9,7 +9,6 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -633,9 +632,14 @@ func testFcntlReleaseOnClose(t *testing.T, fw *FuseTestFramework) {
|
||||
// reader goroutines (blocking FUSE SETLKW can starve unlock processing).
|
||||
func testConcurrentLockContention(t *testing.T, fw *FuseTestFramework) {
|
||||
path := filepath.Join(fw.GetMountPoint(), "lock_contention.txt")
|
||||
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
// Hold a master fd open for the whole subtest. Workers take flock on their
|
||||
// own fds; master stays passive. Keeping this reference pinned avoids the
|
||||
// re-open-after-concurrent-close code path, which has shown an unresolved
|
||||
// mount/metaCache coherence bug where a freshly-opened fd sees ENOENT for
|
||||
// tens of seconds after a burst of concurrent Release calls.
|
||||
master, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, f.Close())
|
||||
defer master.Close()
|
||||
|
||||
numWorkers := 8
|
||||
writesPerWorker := 5
|
||||
@@ -736,45 +740,57 @@ func testConcurrentLockContention(t *testing.T, fw *FuseTestFramework) {
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
if runtime.GOOS == "darwin" {
|
||||
for _, err := range errs {
|
||||
if err != nil && strings.Contains(err.Error(), "no such file or directory") {
|
||||
t.Skip("lock contention file disappeared on darwin FUSE; skipping flaky check")
|
||||
}
|
||||
}
|
||||
}
|
||||
require.Empty(t, errs, "concurrent lock contention errors: %v", errs)
|
||||
|
||||
flush := func() {
|
||||
verify, err := openWithRetry(os.O_RDWR)
|
||||
if err != nil && runtime.GOOS == "darwin" && strings.Contains(err.Error(), "no such file or directory") {
|
||||
t.Skip("lock contention file disappeared on darwin FUSE; skipping flaky check")
|
||||
}
|
||||
require.NoError(t, err)
|
||||
defer verify.Close()
|
||||
require.NoError(t, verify.Sync())
|
||||
}
|
||||
flush()
|
||||
// Flush via the master fd. fsync on any fd to the file forces the
|
||||
// kernel to send FUSE_FSYNC, which drains workers' writes to the filer.
|
||||
require.NoError(t, master.Sync())
|
||||
|
||||
expectedLines := numWorkers * writesPerWorker
|
||||
expectedBytes := expectedLines * recordSize
|
||||
var data []byte
|
||||
require.Eventually(t, func() bool {
|
||||
verify, err := openWithRetry(os.O_RDONLY)
|
||||
if err != nil {
|
||||
if runtime.GOOS == "darwin" && strings.Contains(err.Error(), "no such file or directory") {
|
||||
t.Skip("lock contention file disappeared on darwin FUSE; skipping flaky check")
|
||||
}
|
||||
var lastReadErr error
|
||||
ok := assert.Eventually(t, func() bool {
|
||||
if _, err := master.Seek(0, io.SeekStart); err != nil {
|
||||
lastReadErr = fmt.Errorf("seek: %w", err)
|
||||
return false
|
||||
}
|
||||
defer verify.Close()
|
||||
|
||||
data, err = io.ReadAll(verify)
|
||||
d, err := io.ReadAll(master)
|
||||
if err != nil {
|
||||
lastReadErr = fmt.Errorf("read: %w", err)
|
||||
return false
|
||||
}
|
||||
data = d
|
||||
lastReadErr = nil
|
||||
return len(data) == expectedBytes
|
||||
}, 15*time.Second, 100*time.Millisecond, "file should eventually contain exactly %d records from all workers", expectedLines)
|
||||
if !ok {
|
||||
// Diagnostics: dump what the mount currently shows for the parent dir
|
||||
// and the target path, so post-mortem analysis has something to go on.
|
||||
parent := filepath.Dir(path)
|
||||
if entries, readErr := os.ReadDir(parent); readErr != nil {
|
||||
t.Logf("diag: ReadDir(%q) failed: %v", parent, readErr)
|
||||
} else {
|
||||
names := make([]string, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
names = append(names, e.Name())
|
||||
}
|
||||
t.Logf("diag: ReadDir(%q) returned %d entries: %v", parent, len(names), names)
|
||||
}
|
||||
if fi, statErr := os.Stat(path); statErr != nil {
|
||||
t.Logf("diag: Stat(%q) failed: %v", path, statErr)
|
||||
} else {
|
||||
t.Logf("diag: Stat(%q) ok: size=%d mode=%s", path, fi.Size(), fi.Mode())
|
||||
}
|
||||
if freshFd, openErr := os.OpenFile(path, os.O_RDONLY, 0); openErr != nil {
|
||||
t.Logf("diag: fresh open(%q, O_RDONLY) failed: %v", path, openErr)
|
||||
} else {
|
||||
fresh, readErr := io.ReadAll(freshFd)
|
||||
freshFd.Close()
|
||||
t.Logf("diag: fresh open(%q) read %d bytes (err=%v)", path, len(fresh), readErr)
|
||||
}
|
||||
t.Fatalf("read back incomplete: got %d bytes, want %d (lastReadErr=%v)", len(data), expectedBytes, lastReadErr)
|
||||
}
|
||||
actualLines := bytes.Count(data, []byte("\n"))
|
||||
assert.Equal(t, expectedLines, actualLines,
|
||||
"file should contain exactly %d lines from all workers", expectedLines)
|
||||
|
||||
@@ -583,7 +583,15 @@ func (wfs *WFS) lookupEntry(fullpath util.FullPath) (*filer.Entry, fuse.Status)
|
||||
// our IsDirectoryCached check and FindEntry (e.g. markDirectoryReadThrough).
|
||||
// If it's no longer cached, fall through to the filer lookup below.
|
||||
if wfs.metaCache.IsDirectoryCached(dirPath) {
|
||||
glog.V(4).Infof("lookupEntry cache miss (dir cached) %s", fullpath)
|
||||
// If the kernel is still tracking this path's inode, the entry
|
||||
// was known to exist recently; a cached-dir miss here suggests
|
||||
// metaCache/parent-cache coherence drift. Log visibly so the
|
||||
// next occurrence shows up in mount.log without -v=4.
|
||||
if _, inodeFound := wfs.inodeToPath.GetInode(fullpath); inodeFound {
|
||||
glog.Warningf("lookupEntry: %s missing from cache while parent %s is cached; inode still tracked (possible coherence bug)", fullpath, dirPath)
|
||||
} else {
|
||||
glog.V(4).Infof("lookupEntry cache miss (dir cached) %s", fullpath)
|
||||
}
|
||||
return nil, fuse.ENOENT
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user