mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-28 20:50:20 +00:00
test: wait for a writable volume before lifecycle tests' first write (#9658)
Probe one throwaway write once per process before the lifecycle tests run, absorbing the post-start volume-growth window so the first real PutObject doesn't race volume growth and 500. Each call is bounded by the remaining 60s budget; CreateBucket is retried within it.
This commit is contained in:
@@ -22,6 +22,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -69,7 +70,111 @@ func s3Client(t *testing.T) *s3.Client {
|
||||
})),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return s3.NewFromConfig(cfg, func(o *s3.Options) { o.UsePathStyle = true })
|
||||
client := s3.NewFromConfig(cfg, func(o *s3.Options) { o.UsePathStyle = true })
|
||||
ensureClusterWritable(t, client)
|
||||
return client
|
||||
}
|
||||
|
||||
var clusterWritableOnce sync.Once
|
||||
|
||||
// ensureClusterWritable blocks until the cluster can actually serve a write,
|
||||
// absorbing the volume-growth warmup window after a fresh start. The Makefile
|
||||
// only waits for the server process to be up ("server up after N s"); it does
|
||||
// not wait for a writable volume, so the first PutObject can race volume growth
|
||||
// and fail with a transient 500 (assign volume: DeadlineExceeded) — the source
|
||||
// of the lifecycle-test flakes. Probing one throwaway write here, once per
|
||||
// process, warms growth so every test's first real write is past that window.
|
||||
// Best-effort: if it never becomes writable, the test's own PutObject surfaces
|
||||
// the failure normally.
|
||||
func ensureClusterWritable(t *testing.T, c *s3.Client) {
|
||||
t.Helper()
|
||||
clusterWritableOnce.Do(func() {
|
||||
bucket := uniqueBucket("warmup")
|
||||
deadline := time.Now().Add(60 * time.Second)
|
||||
|
||||
// try runs fn under a bounded context so a single hung call can't block.
|
||||
try := func(timeout time.Duration, fn func(ctx context.Context) error) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
return fn(ctx)
|
||||
}
|
||||
// probe is a try whose timeout is clamped to the time left before the
|
||||
// deadline, so the whole warmup stays within the budget; ok=false means
|
||||
// the budget is exhausted and the caller should stop.
|
||||
probe := func(fn func(ctx context.Context) error) (err error, ok bool) {
|
||||
remaining := time.Until(deadline)
|
||||
if remaining <= 0 {
|
||||
return nil, false
|
||||
}
|
||||
if remaining > 10*time.Second {
|
||||
remaining = 10 * time.Second
|
||||
}
|
||||
return try(remaining, fn), true
|
||||
}
|
||||
// backoff sleeps attempt*250ms, never past the deadline.
|
||||
backoff := func(attempt int) {
|
||||
d := time.Duration(attempt) * 250 * time.Millisecond
|
||||
if left := time.Until(deadline); d > left {
|
||||
d = left
|
||||
}
|
||||
if d > 0 {
|
||||
time.Sleep(d)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateBucket is a metadata op, but on a cold cluster the filer itself may
|
||||
// not be ready yet, so retry it within the deadline rather than abandoning
|
||||
// the whole warmup (and the PutObject probe) on the first error.
|
||||
created := false
|
||||
for attempt := 1; ; attempt++ {
|
||||
err, ok := probe(func(ctx context.Context) error {
|
||||
_, e := c.CreateBucket(ctx, &s3.CreateBucketInput{Bucket: aws.String(bucket)})
|
||||
return e
|
||||
})
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if err == nil {
|
||||
created = true
|
||||
break
|
||||
}
|
||||
backoff(attempt)
|
||||
}
|
||||
if !created {
|
||||
t.Logf("warmup: could not create probe bucket within 60s; proceeding")
|
||||
return
|
||||
}
|
||||
// Cleanup gets a fresh timeout (not the warmup budget) so teardown runs
|
||||
// even when the probe loop used the full window.
|
||||
defer try(10*time.Second, func(ctx context.Context) error {
|
||||
_, e := c.DeleteBucket(ctx, &s3.DeleteBucketInput{Bucket: aws.String(bucket)})
|
||||
return e
|
||||
})
|
||||
|
||||
for attempt := 1; ; attempt++ {
|
||||
err, ok := probe(func(ctx context.Context) error {
|
||||
_, e := c.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(bucket), Key: aws.String("warmup"), Body: strings.NewReader("ok"),
|
||||
})
|
||||
return e
|
||||
})
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if err == nil {
|
||||
try(10*time.Second, func(ctx context.Context) error {
|
||||
_, e := c.DeleteObject(ctx, &s3.DeleteObjectInput{Bucket: aws.String(bucket), Key: aws.String("warmup")})
|
||||
return e
|
||||
})
|
||||
if attempt > 1 {
|
||||
t.Logf("cluster became writable after %d probe(s)", attempt)
|
||||
}
|
||||
return
|
||||
}
|
||||
backoff(attempt)
|
||||
}
|
||||
t.Logf("warmup: cluster not confirmed writable within 60s; proceeding")
|
||||
})
|
||||
}
|
||||
|
||||
func filerClient(t *testing.T) (filer_pb.SeaweedFilerClient, func()) {
|
||||
|
||||
Reference in New Issue
Block a user